1mod identify;
10mod kad;
11mod request_response;
12mod swarm;
13
14use crate::{driver::SwarmDriver, error::Result, relay_manager::is_a_relayed_peer, Addresses};
15use core::fmt;
16use custom_debug::Debug as CustomDebug;
17use libp2p::{
18 kad::{Record, RecordKey, K_VALUE},
19 request_response::ResponseChannel as PeerResponseChannel,
20 Multiaddr, PeerId,
21};
22
23use ant_evm::{PaymentQuote, ProofOfPayment};
24use ant_protocol::messages::ConnectionInfo;
25use ant_protocol::storage::DataTypes;
26#[cfg(feature = "open-metrics")]
27use ant_protocol::CLOSE_GROUP_SIZE;
28use ant_protocol::{
29 messages::{Query, Request, Response},
30 storage::ValidationType,
31 NetworkAddress, PrettyPrintRecordKey,
32};
33#[cfg(feature = "open-metrics")]
34use std::collections::HashSet;
35use std::fmt::Display;
36use std::{
37 collections::BTreeMap,
38 fmt::{Debug, Formatter},
39};
40use tokio::sync::oneshot;
41
42#[derive(Debug, Clone)]
43pub(crate) struct KBucketStatus {
44 pub(crate) total_buckets: usize,
45 pub(crate) total_peers: usize,
46 pub(crate) total_relay_peers: usize,
47 pub(crate) peers_in_non_full_buckets: usize,
48 #[cfg(feature = "open-metrics")]
49 pub(crate) relay_peers_in_non_full_buckets: usize,
50 pub(crate) num_of_full_buckets: usize,
51 pub(crate) kbucket_table_stats: Vec<(usize, usize, u32)>,
52 pub(crate) estimated_network_size: usize,
53}
54
55impl KBucketStatus {
56 pub(crate) fn log(&self) {
57 info!(
58 "kBucketTable has {:?} kbuckets {:?} peers ({} relay peers), {:?}, estimated network size: {:?}",
59 self.total_buckets,
60 self.total_peers,
61 self.total_relay_peers,
62 self.kbucket_table_stats,
63 self.estimated_network_size
64 );
65 #[cfg(feature = "loud")]
66 println!("Estimated network size: {:?}", self.estimated_network_size);
67 }
68}
69
70#[derive(CustomDebug)]
72pub(super) enum NodeEvent {
73 Upnp(libp2p::upnp::Event),
74 MsgReceived(libp2p::request_response::Event<Request, Response>),
75 Kademlia(libp2p::kad::Event),
76 Identify(Box<libp2p::identify::Event>),
77 RelayClient(Box<libp2p::relay::client::Event>),
78 RelayServer(Box<libp2p::relay::Event>),
79 DoNotDisturb(crate::behaviour::do_not_disturb::DoNotDisturbEvent),
80 Void(void::Void),
81}
82
83impl From<libp2p::upnp::Event> for NodeEvent {
84 fn from(event: libp2p::upnp::Event) -> Self {
85 NodeEvent::Upnp(event)
86 }
87}
88
89impl From<libp2p::request_response::Event<Request, Response>> for NodeEvent {
90 fn from(event: libp2p::request_response::Event<Request, Response>) -> Self {
91 NodeEvent::MsgReceived(event)
92 }
93}
94
95impl From<libp2p::kad::Event> for NodeEvent {
96 fn from(event: libp2p::kad::Event) -> Self {
97 NodeEvent::Kademlia(event)
98 }
99}
100
101impl From<libp2p::identify::Event> for NodeEvent {
102 fn from(event: libp2p::identify::Event) -> Self {
103 NodeEvent::Identify(Box::new(event))
104 }
105}
106impl From<libp2p::relay::client::Event> for NodeEvent {
107 fn from(event: libp2p::relay::client::Event) -> Self {
108 NodeEvent::RelayClient(Box::new(event))
109 }
110}
111impl From<libp2p::relay::Event> for NodeEvent {
112 fn from(event: libp2p::relay::Event) -> Self {
113 NodeEvent::RelayServer(Box::new(event))
114 }
115}
116
117impl From<crate::behaviour::do_not_disturb::DoNotDisturbEvent> for NodeEvent {
118 fn from(event: crate::behaviour::do_not_disturb::DoNotDisturbEvent) -> Self {
119 NodeEvent::DoNotDisturb(event)
120 }
121}
122
123impl From<void::Void> for NodeEvent {
124 fn from(event: void::Void) -> Self {
125 NodeEvent::Void(event)
126 }
127}
128
129#[allow(clippy::type_complexity)]
130#[derive(CustomDebug)]
131pub enum MsgResponder {
133 FromSelf(Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>),
135 FromPeer(PeerResponseChannel<Response>),
137}
138
139pub enum NetworkEvent {
141 QueryRequestReceived {
143 query: Query,
145 channel: MsgResponder,
147 },
148 ResponseReceived {
150 res: Response,
152 },
153 PeerAdded(PeerId, usize),
155 PeerRemoved(PeerId, usize),
157 PeerWithUnsupportedProtocol {
159 our_protocol: String,
160 their_protocol: String,
161 },
162 KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
164 NewListenAddr(Multiaddr),
166 UnverifiedRecord(Record),
168 TerminateNode { reason: TerminateNodeReason },
170 FailedToFetchHolders(BTreeMap<PeerId, RecordKey>),
172 QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
174 FreshReplicateToFetch {
176 holder: NetworkAddress,
177 keys: Vec<(
178 NetworkAddress,
179 DataTypes,
180 ValidationType,
181 Option<ProofOfPayment>,
182 )>,
183 },
184 PeersForVersionQuery(Vec<(PeerId, Addresses)>),
186}
187
188#[derive(Debug, Clone)]
190pub enum TerminateNodeReason {
191 HardDiskWriteError,
192 UpnpGatewayNotFound,
193}
194
195impl Debug for NetworkEvent {
197 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
198 match self {
199 NetworkEvent::QueryRequestReceived { query, .. } => {
200 write!(f, "NetworkEvent::QueryRequestReceived({query:?})")
201 }
202 NetworkEvent::ResponseReceived { res, .. } => {
203 write!(f, "NetworkEvent::ResponseReceived({res:?})")
204 }
205 NetworkEvent::PeerAdded(peer_id, connected_peers) => {
206 write!(f, "NetworkEvent::PeerAdded({peer_id:?}, {connected_peers})")
207 }
208 NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
209 write!(
210 f,
211 "NetworkEvent::PeerRemoved({peer_id:?}, {connected_peers})"
212 )
213 }
214 NetworkEvent::PeerWithUnsupportedProtocol {
215 our_protocol,
216 their_protocol,
217 } => {
218 write!(f, "NetworkEvent::PeerWithUnsupportedProtocol({our_protocol:?}, {their_protocol:?})")
219 }
220 NetworkEvent::KeysToFetchForReplication(list) => {
221 let keys_len = list.len();
222 write!(f, "NetworkEvent::KeysForReplication({keys_len:?})")
223 }
224 NetworkEvent::NewListenAddr(addr) => {
225 write!(f, "NetworkEvent::NewListenAddr({addr:?})")
226 }
227 NetworkEvent::UnverifiedRecord(record) => {
228 let pretty_key = PrettyPrintRecordKey::from(&record.key);
229 write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
230 }
231 NetworkEvent::TerminateNode { reason } => {
232 write!(f, "NetworkEvent::TerminateNode({reason:?})")
233 }
234 NetworkEvent::FailedToFetchHolders(bad_nodes) => {
235 let pretty_log: Vec<_> = bad_nodes
236 .iter()
237 .map(|(peer_id, record_key)| {
238 let pretty_key = PrettyPrintRecordKey::from(record_key);
239 (peer_id, pretty_key)
240 })
241 .collect();
242 write!(f, "NetworkEvent::FailedToFetchHolders({pretty_log:?})")
243 }
244 NetworkEvent::QuoteVerification { quotes } => {
245 write!(
246 f,
247 "NetworkEvent::QuoteVerification({} quotes)",
248 quotes.len()
249 )
250 }
251 NetworkEvent::FreshReplicateToFetch { holder, keys } => {
252 write!(
253 f,
254 "NetworkEvent::FreshReplicateToFetch({holder:?}, {keys:?})"
255 )
256 }
257 NetworkEvent::PeersForVersionQuery(peers) => {
258 write!(
259 f,
260 "NetworkEvent::PeersForVersionQuery({:?})",
261 peers
262 .iter()
263 .map(|(peer, _addrs)| peer)
264 .collect::<Vec<&PeerId>>()
265 )
266 }
267 }
268 }
269}
270
271impl Display for TerminateNodeReason {
272 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
273 match self {
274 TerminateNodeReason::HardDiskWriteError => {
275 write!(f, "HardDiskWriteError")
276 }
277 TerminateNodeReason::UpnpGatewayNotFound => {
278 write!(f, "UPnP gateway not found. Enable UPnP on your router to allow incoming connections or manually port forward.")
279 }
280 }
281 }
282}
283
284impl SwarmDriver {
285 #[cfg(feature = "open-metrics")]
287 pub(crate) fn check_for_change_in_our_close_group(&mut self) {
288 let closest_k_peers = self.get_closest_k_local_peers_to_self();
290
291 let new_closest_peers: Vec<PeerId> = closest_k_peers
292 .into_iter()
293 .map(|(peer_id, _)| peer_id)
294 .take(CLOSE_GROUP_SIZE)
295 .collect();
296
297 let old = self.close_group.iter().cloned().collect::<HashSet<_>>();
298 let new_members: Vec<_> = new_closest_peers
299 .iter()
300 .filter(|p| !old.contains(p))
301 .collect();
302 if !new_members.is_empty() {
303 debug!("The close group has been updated. The new members are {new_members:?}");
304 debug!("New close group: {new_closest_peers:?}");
305 self.close_group = new_closest_peers.clone();
306 self.record_change_in_close_group(new_closest_peers);
307 }
308 }
309
310 pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) {
312 let kbucket_status = self.get_kbuckets_status();
313 self.update_on_kbucket_status(&kbucket_status);
314
315 let distance =
316 NetworkAddress::from(self.self_peer_id).distance(&NetworkAddress::from(added_peer));
317 info!("Node {:?} added new peer into routing table: {added_peer:?}. It has a {:?} distance to us.",
318 self.self_peer_id, distance.ilog2());
319
320 #[cfg(feature = "loud")]
321 println!(
322 "New peer added to routing table: {added_peer:?}, now we have #{} connected peers",
323 self.peers_in_rt
324 );
325
326 kbucket_status.log();
327
328 if let Some(bootstrap_cache) = &mut self.bootstrap_cache {
329 for addr in addresses.0.iter() {
330 bootstrap_cache.add_addr(addr.clone());
331 }
332 }
333
334 self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));
335
336 #[cfg(feature = "open-metrics")]
337 if self.metrics_recorder.is_some() {
338 self.check_for_change_in_our_close_group();
339 }
340 }
341
342 pub(crate) fn update_on_peer_removal(&mut self, removed_peer: PeerId) {
344 let kbucket_status = self.get_kbuckets_status();
345 self.update_on_kbucket_status(&kbucket_status);
346
347 let _result = self.swarm.disconnect_peer_id(removed_peer);
350
351 let distance =
352 NetworkAddress::from(self.self_peer_id).distance(&NetworkAddress::from(removed_peer));
353 info!(
354 "Peer removed from routing table: {removed_peer:?}. We now have #{} connected peers. It has a {:?} distance to us.",
355 self.peers_in_rt, distance.ilog2()
356 );
357
358 self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt));
359
360 kbucket_status.log();
361
362 #[cfg(feature = "open-metrics")]
363 if self.metrics_recorder.is_some() {
364 self.check_for_change_in_our_close_group();
365 }
366 }
367
368 pub(crate) fn get_kbuckets_status(&mut self) -> KBucketStatus {
370 let mut kbucket_table_stats = vec![];
371 let mut index = 0;
372 let mut total_peers = 0;
373 let mut total_relay_peers = 0;
374
375 let mut peers_in_non_full_buckets = 0;
376 let mut relay_peers_in_non_full_buckets = 0;
377 let mut num_of_full_buckets = 0;
378
379 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
380 let range = kbucket.range();
381 let num_entires = kbucket.num_entries();
382
383 kbucket.iter().for_each(|entry| {
384 if is_a_relayed_peer(entry.node.value.iter()) {
385 total_relay_peers += 1;
386 if num_entires < K_VALUE.get() {
387 relay_peers_in_non_full_buckets += 1;
388 }
389 }
390 });
391
392 if num_entires >= K_VALUE.get() {
393 num_of_full_buckets += 1;
394 } else {
395 peers_in_non_full_buckets += num_entires;
396 }
397
398 total_peers += num_entires;
399 if let Some(distance) = range.0.ilog2() {
400 kbucket_table_stats.push((index, num_entires, distance));
401 } else {
402 error!("bucket #{index:?} is ourself ???!!!");
404 }
405 index += 1;
406 }
407
408 let estimated_network_size =
409 Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
410
411 KBucketStatus {
412 total_buckets: index,
413 total_peers,
414 total_relay_peers,
415 peers_in_non_full_buckets,
416 #[cfg(feature = "open-metrics")]
417 relay_peers_in_non_full_buckets,
418 num_of_full_buckets,
419 kbucket_table_stats,
420 estimated_network_size,
421 }
422 }
423
424 pub(crate) fn update_on_kbucket_status(&mut self, status: &KBucketStatus) {
426 self.peers_in_rt = status.total_peers;
427 #[cfg(feature = "open-metrics")]
428 if let Some(metrics_recorder) = &self.metrics_recorder {
429 metrics_recorder
430 .peers_in_routing_table
431 .set(status.total_peers as i64);
432
433 let _ = metrics_recorder
434 .relay_peers_in_routing_table
435 .set(status.total_relay_peers as i64);
436
437 let estimated_network_size = Self::estimate_network_size(
438 status.peers_in_non_full_buckets,
439 status.num_of_full_buckets,
440 );
441 let _ = metrics_recorder
442 .estimated_network_size
443 .set(estimated_network_size as i64);
444
445 let _ = metrics_recorder.relay_peers_percentage.set(
446 (status.relay_peers_in_non_full_buckets as f64
447 / status.peers_in_non_full_buckets as f64)
448 * 100.0,
449 );
450 }
451 }
452
453 pub(crate) fn estimate_network_size(
455 peers_in_non_full_buckets: usize,
456 num_of_full_buckets: usize,
457 ) -> usize {
458 (peers_in_non_full_buckets + 1) * (2_usize.pow(num_of_full_buckets as u32))
459 }
460}