ant_networking/event/
mod.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9mod identify;
10mod kad;
11mod request_response;
12mod swarm;
13
14use crate::{driver::SwarmDriver, error::Result, relay_manager::is_a_relayed_peer, Addresses};
15use core::fmt;
16use custom_debug::Debug as CustomDebug;
17use libp2p::{
18    kad::{Record, RecordKey, K_VALUE},
19    request_response::ResponseChannel as PeerResponseChannel,
20    Multiaddr, PeerId,
21};
22
23use ant_evm::{PaymentQuote, ProofOfPayment};
24use ant_protocol::messages::ConnectionInfo;
25use ant_protocol::storage::DataTypes;
26#[cfg(feature = "open-metrics")]
27use ant_protocol::CLOSE_GROUP_SIZE;
28use ant_protocol::{
29    messages::{Query, Request, Response},
30    storage::ValidationType,
31    NetworkAddress, PrettyPrintRecordKey,
32};
33#[cfg(feature = "open-metrics")]
34use std::collections::HashSet;
35use std::fmt::Display;
36use std::{
37    collections::BTreeMap,
38    fmt::{Debug, Formatter},
39};
40use tokio::sync::oneshot;
41
42#[derive(Debug, Clone)]
43pub(crate) struct KBucketStatus {
44    pub(crate) total_buckets: usize,
45    pub(crate) total_peers: usize,
46    pub(crate) total_relay_peers: usize,
47    pub(crate) peers_in_non_full_buckets: usize,
48    #[cfg(feature = "open-metrics")]
49    pub(crate) relay_peers_in_non_full_buckets: usize,
50    pub(crate) num_of_full_buckets: usize,
51    pub(crate) kbucket_table_stats: Vec<(usize, usize, u32)>,
52    pub(crate) estimated_network_size: usize,
53}
54
55impl KBucketStatus {
56    pub(crate) fn log(&self) {
57        info!(
58            "kBucketTable has {:?} kbuckets {:?} peers ({} relay peers), {:?}, estimated network size: {:?}",
59            self.total_buckets,
60            self.total_peers,
61            self.total_relay_peers,
62            self.kbucket_table_stats,
63            self.estimated_network_size
64        );
65        #[cfg(feature = "loud")]
66        println!("Estimated network size: {:?}", self.estimated_network_size);
67    }
68}
69
70/// NodeEvent enum
71#[derive(CustomDebug)]
72pub(super) enum NodeEvent {
73    Upnp(libp2p::upnp::Event),
74    MsgReceived(libp2p::request_response::Event<Request, Response>),
75    Kademlia(libp2p::kad::Event),
76    Identify(Box<libp2p::identify::Event>),
77    RelayClient(Box<libp2p::relay::client::Event>),
78    RelayServer(Box<libp2p::relay::Event>),
79    DoNotDisturb(crate::behaviour::do_not_disturb::DoNotDisturbEvent),
80    Void(void::Void),
81}
82
83impl From<libp2p::upnp::Event> for NodeEvent {
84    fn from(event: libp2p::upnp::Event) -> Self {
85        NodeEvent::Upnp(event)
86    }
87}
88
89impl From<libp2p::request_response::Event<Request, Response>> for NodeEvent {
90    fn from(event: libp2p::request_response::Event<Request, Response>) -> Self {
91        NodeEvent::MsgReceived(event)
92    }
93}
94
95impl From<libp2p::kad::Event> for NodeEvent {
96    fn from(event: libp2p::kad::Event) -> Self {
97        NodeEvent::Kademlia(event)
98    }
99}
100
101impl From<libp2p::identify::Event> for NodeEvent {
102    fn from(event: libp2p::identify::Event) -> Self {
103        NodeEvent::Identify(Box::new(event))
104    }
105}
106impl From<libp2p::relay::client::Event> for NodeEvent {
107    fn from(event: libp2p::relay::client::Event) -> Self {
108        NodeEvent::RelayClient(Box::new(event))
109    }
110}
111impl From<libp2p::relay::Event> for NodeEvent {
112    fn from(event: libp2p::relay::Event) -> Self {
113        NodeEvent::RelayServer(Box::new(event))
114    }
115}
116
117impl From<crate::behaviour::do_not_disturb::DoNotDisturbEvent> for NodeEvent {
118    fn from(event: crate::behaviour::do_not_disturb::DoNotDisturbEvent) -> Self {
119        NodeEvent::DoNotDisturb(event)
120    }
121}
122
123impl From<void::Void> for NodeEvent {
124    fn from(event: void::Void) -> Self {
125        NodeEvent::Void(event)
126    }
127}
128
129#[allow(clippy::type_complexity)]
130#[derive(CustomDebug)]
131/// Channel to send the `Response` through.
132pub enum MsgResponder {
133    /// Respond to a request from `self` through a simple one-shot channel.
134    FromSelf(Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>),
135    /// Respond to a request from a peer in the network.
136    FromPeer(PeerResponseChannel<Response>),
137}
138
139/// Events forwarded by the underlying Network; to be used by the upper layers
140pub enum NetworkEvent {
141    /// Incoming `Query` from a peer
142    QueryRequestReceived {
143        /// Query
144        query: Query,
145        /// The channel to send the `Response` through
146        channel: MsgResponder,
147    },
148    /// Handles the responses that are not awaited at the call site
149    ResponseReceived {
150        /// Response
151        res: Response,
152    },
153    /// Peer has been added to the Routing Table. And the number of connected peers.
154    PeerAdded(PeerId, usize),
155    /// Peer has been removed from the Routing Table. And the number of connected peers.
156    PeerRemoved(PeerId, usize),
157    /// The peer does not support our protocol
158    PeerWithUnsupportedProtocol {
159        our_protocol: String,
160        their_protocol: String,
161    },
162    /// The records bearing these keys are to be fetched from the holder or the network
163    KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
164    /// Started listening on a new address
165    NewListenAddr(Multiaddr),
166    /// Report unverified record
167    UnverifiedRecord(Record),
168    /// Terminate Node on unrecoverable errors
169    TerminateNode { reason: TerminateNodeReason },
170    /// List of peer nodes that failed to fetch replication copy from.
171    FailedToFetchHolders(BTreeMap<PeerId, RecordKey>),
172    /// Quotes to be verified
173    QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
174    /// Fresh replicate to fetch
175    FreshReplicateToFetch {
176        holder: NetworkAddress,
177        keys: Vec<(
178            NetworkAddress,
179            DataTypes,
180            ValidationType,
181            Option<ProofOfPayment>,
182        )>,
183    },
184    /// Peers of picked bucket for version query.
185    PeersForVersionQuery(Vec<(PeerId, Addresses)>),
186}
187
188/// Terminate node for the following reason
189#[derive(Debug, Clone)]
190pub enum TerminateNodeReason {
191    HardDiskWriteError,
192    UpnpGatewayNotFound,
193}
194
195// Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected.
196impl Debug for NetworkEvent {
197    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
198        match self {
199            NetworkEvent::QueryRequestReceived { query, .. } => {
200                write!(f, "NetworkEvent::QueryRequestReceived({query:?})")
201            }
202            NetworkEvent::ResponseReceived { res, .. } => {
203                write!(f, "NetworkEvent::ResponseReceived({res:?})")
204            }
205            NetworkEvent::PeerAdded(peer_id, connected_peers) => {
206                write!(f, "NetworkEvent::PeerAdded({peer_id:?}, {connected_peers})")
207            }
208            NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
209                write!(
210                    f,
211                    "NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})"
212                )
213            }
214            NetworkEvent::PeerWithUnsupportedProtocol {
215                our_protocol,
216                their_protocol,
217            } => {
218                write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})")
219            }
220            NetworkEvent::KeysToFetchForReplication(list) => {
221                let keys_len = list.len();
222                write!(f, "NetworkEvent::KeysForReplication({keys_len:?})")
223            }
224            NetworkEvent::NewListenAddr(addr) => {
225                write!(f, "NetworkEvent::NewListenAddr({addr:?})")
226            }
227            NetworkEvent::UnverifiedRecord(record) => {
228                let pretty_key = PrettyPrintRecordKey::from(&record.key);
229                write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
230            }
231            NetworkEvent::TerminateNode { reason } => {
232                write!(f, "NetworkEvent::TerminateNode({reason:?})")
233            }
234            NetworkEvent::FailedToFetchHolders(bad_nodes) => {
235                let pretty_log: Vec<_> = bad_nodes
236                    .iter()
237                    .map(|(peer_id, record_key)| {
238                        let pretty_key = PrettyPrintRecordKey::from(record_key);
239                        (peer_id, pretty_key)
240                    })
241                    .collect();
242                write!(f, "NetworkEvent::FailedToFetchHolders({pretty_log:?})")
243            }
244            NetworkEvent::QuoteVerification { quotes } => {
245                write!(
246                    f,
247                    "NetworkEvent::QuoteVerification({} quotes)",
248                    quotes.len()
249                )
250            }
251            NetworkEvent::FreshReplicateToFetch { holder, keys } => {
252                write!(
253                    f,
254                    "NetworkEvent::FreshReplicateToFetch({holder:?}, {keys:?})"
255                )
256            }
257            NetworkEvent::PeersForVersionQuery(peers) => {
258                write!(
259                    f,
260                    "NetworkEvent::PeersForVersionQuery({:?})",
261                    peers
262                        .iter()
263                        .map(|(peer, _addrs)| peer)
264                        .collect::<Vec<&PeerId>>()
265                )
266            }
267        }
268    }
269}
270
271impl Display for TerminateNodeReason {
272    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
273        match self {
274            TerminateNodeReason::HardDiskWriteError => {
275                write!(f, "HardDiskWriteError")
276            }
277            TerminateNodeReason::UpnpGatewayNotFound => {
278                write!(f, "UPnP gateway not found. Enable UPnP on your router to allow incoming connections or manually port forward.")
279            }
280        }
281    }
282}
283
284impl SwarmDriver {
285    /// Check for changes in our close group
286    #[cfg(feature = "open-metrics")]
287    pub(crate) fn check_for_change_in_our_close_group(&mut self) {
288        // this includes self
289        let closest_k_peers = self.get_closest_k_local_peers_to_self();
290
291        let new_closest_peers: Vec<PeerId> = closest_k_peers
292            .into_iter()
293            .map(|(peer_id, _)| peer_id)
294            .take(CLOSE_GROUP_SIZE)
295            .collect();
296
297        let old = self.close_group.iter().cloned().collect::<HashSet<_>>();
298        let new_members: Vec<_> = new_closest_peers
299            .iter()
300            .filter(|p| !old.contains(p))
301            .collect();
302        if !new_members.is_empty() {
303            debug!("The close group has been updated. The new members are {new_members:?}");
304            debug!("New close group: {new_closest_peers:?}");
305            self.close_group = new_closest_peers.clone();
306            self.record_change_in_close_group(new_closest_peers);
307        }
308    }
309
310    /// Update state on addition of a peer to the routing table.
311    pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) {
312        let kbucket_status = self.get_kbuckets_status();
313        self.update_on_kbucket_status(&kbucket_status);
314
315        let distance =
316            NetworkAddress::from(self.self_peer_id).distance(&NetworkAddress::from(added_peer));
317        info!("Node {:?} added new peer into routing table: {added_peer:?}. It has a {:?} distance to us.", 
318        self.self_peer_id, distance.ilog2());
319
320        #[cfg(feature = "loud")]
321        println!(
322            "New peer added to routing table: {added_peer:?}, now we have #{} connected peers",
323            self.peers_in_rt
324        );
325
326        kbucket_status.log();
327
328        if let Some(bootstrap_cache) = &mut self.bootstrap_cache {
329            for addr in addresses.0.iter() {
330                bootstrap_cache.add_addr(addr.clone());
331            }
332        }
333
334        self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));
335
336        #[cfg(feature = "open-metrics")]
337        if self.metrics_recorder.is_some() {
338            self.check_for_change_in_our_close_group();
339        }
340    }
341
342    /// Update state on removal of a peer from the routing table.
343    pub(crate) fn update_on_peer_removal(&mut self, removed_peer: PeerId) {
344        let kbucket_status = self.get_kbuckets_status();
345        self.update_on_kbucket_status(&kbucket_status);
346
347        // ensure we disconnect bad peer
348        // err result just means no connections were open
349        let _result = self.swarm.disconnect_peer_id(removed_peer);
350
351        let distance =
352            NetworkAddress::from(self.self_peer_id).distance(&NetworkAddress::from(removed_peer));
353        info!(
354            "Peer removed from routing table: {removed_peer:?}. We now have #{} connected peers. It has a {:?} distance to us.",
355            self.peers_in_rt, distance.ilog2()
356        );
357
358        self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt));
359
360        kbucket_status.log();
361
362        #[cfg(feature = "open-metrics")]
363        if self.metrics_recorder.is_some() {
364            self.check_for_change_in_our_close_group();
365        }
366    }
367
368    /// Get the status of the kbucket table.
369    pub(crate) fn get_kbuckets_status(&mut self) -> KBucketStatus {
370        let mut kbucket_table_stats = vec![];
371        let mut index = 0;
372        let mut total_peers = 0;
373        let mut total_relay_peers = 0;
374
375        let mut peers_in_non_full_buckets = 0;
376        let mut relay_peers_in_non_full_buckets = 0;
377        let mut num_of_full_buckets = 0;
378
379        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
380            let range = kbucket.range();
381            let num_entires = kbucket.num_entries();
382
383            kbucket.iter().for_each(|entry| {
384                if is_a_relayed_peer(entry.node.value.iter()) {
385                    total_relay_peers += 1;
386                    if num_entires < K_VALUE.get() {
387                        relay_peers_in_non_full_buckets += 1;
388                    }
389                }
390            });
391
392            if num_entires >= K_VALUE.get() {
393                num_of_full_buckets += 1;
394            } else {
395                peers_in_non_full_buckets += num_entires;
396            }
397
398            total_peers += num_entires;
399            if let Some(distance) = range.0.ilog2() {
400                kbucket_table_stats.push((index, num_entires, distance));
401            } else {
402                // This shall never happen.
403                error!("bucket #{index:?} is ourself ???!!!");
404            }
405            index += 1;
406        }
407
408        let estimated_network_size =
409            Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
410
411        KBucketStatus {
412            total_buckets: index,
413            total_peers,
414            total_relay_peers,
415            peers_in_non_full_buckets,
416            #[cfg(feature = "open-metrics")]
417            relay_peers_in_non_full_buckets,
418            num_of_full_buckets,
419            kbucket_table_stats,
420            estimated_network_size,
421        }
422    }
423
424    /// Update SwarmDriver field & also record metrics based on the newly calculated kbucket status.
425    pub(crate) fn update_on_kbucket_status(&mut self, status: &KBucketStatus) {
426        self.peers_in_rt = status.total_peers;
427        #[cfg(feature = "open-metrics")]
428        if let Some(metrics_recorder) = &self.metrics_recorder {
429            metrics_recorder
430                .peers_in_routing_table
431                .set(status.total_peers as i64);
432
433            let _ = metrics_recorder
434                .relay_peers_in_routing_table
435                .set(status.total_relay_peers as i64);
436
437            let estimated_network_size = Self::estimate_network_size(
438                status.peers_in_non_full_buckets,
439                status.num_of_full_buckets,
440            );
441            let _ = metrics_recorder
442                .estimated_network_size
443                .set(estimated_network_size as i64);
444
445            let _ = metrics_recorder.relay_peers_percentage.set(
446                (status.relay_peers_in_non_full_buckets as f64
447                    / status.peers_in_non_full_buckets as f64)
448                    * 100.0,
449            );
450        }
451    }
452
453    /// Estimate the number of nodes in the network
454    pub(crate) fn estimate_network_size(
455        peers_in_non_full_buckets: usize,
456        num_of_full_buckets: usize,
457    ) -> usize {
458        (peers_in_non_full_buckets + 1) * (2_usize.pow(num_of_full_buckets as u32))
459    }
460}