1#![allow(clippy::large_enum_variant)]
10#![allow(clippy::result_large_err)]
11
12#[macro_use]
13extern crate tracing;
14
15mod behaviour;
16mod bootstrap;
17mod circular_vec;
18mod cmd;
19mod config;
20mod driver;
21mod error;
22mod event;
23mod external_address;
24mod log_markers;
25#[cfg(feature = "open-metrics")]
26mod metrics;
27mod network_builder;
28mod network_discovery;
29mod record_store;
30mod relay_manager;
31mod replication_fetcher;
32pub mod time;
33mod transport;
34
35use cmd::LocalSwarmCmd;
36
37pub use self::{
39 cmd::{NodeIssue, SwarmLocalState},
40 config::ResponseQuorum,
41 driver::SwarmDriver,
42 error::NetworkError,
43 event::{MsgResponder, NetworkEvent},
44 network_builder::{NetworkBuilder, MAX_PACKET_SIZE},
45 record_store::NodeRecordStore,
46};
47#[cfg(feature = "open-metrics")]
48pub use metrics::service::MetricsRegistries;
49pub use time::{interval, sleep, spawn, Instant, Interval};
50
51use self::{cmd::NetworkSwarmCmd, error::Result};
52use ant_evm::{PaymentQuote, QuotingMetrics};
53use ant_protocol::{
54 messages::{ConnectionInfo, Request, Response},
55 storage::ValidationType,
56 NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
57};
58use futures::future::select_all;
59use libp2p::{
60 identity::Keypair,
61 kad::{KBucketDistance, KBucketKey, Record, RecordKey, K_VALUE},
62 multiaddr::Protocol,
63 request_response::OutboundFailure,
64 Multiaddr, PeerId,
65};
66use std::{
67 collections::{BTreeMap, HashMap},
68 net::IpAddr,
69 sync::Arc,
70};
71use tokio::sync::{
72 mpsc::{self, Sender},
73 oneshot,
74};
75
76#[inline]
78pub const fn close_group_majority() -> usize {
79 CLOSE_GROUP_SIZE / 2 + 1
82}
83
84pub fn sort_peers_by_key<T>(
87 peers: Vec<(PeerId, Addresses)>,
88 key: &KBucketKey<T>,
89 expected_entries: usize,
90) -> Result<Vec<(PeerId, Addresses)>> {
91 if CLOSE_GROUP_SIZE > peers.len() {
94 warn!("Not enough peers in the k-bucket to satisfy the request");
95 return Err(NetworkError::NotEnoughPeers {
96 found: peers.len(),
97 required: CLOSE_GROUP_SIZE,
98 });
99 }
100
101 let mut peer_distances: Vec<(PeerId, Addresses, KBucketDistance)> =
104 Vec::with_capacity(peers.len());
105
106 for (peer_id, addrs) in peers.into_iter() {
107 let addr = NetworkAddress::from(peer_id);
108 let distance = key.distance(&addr.as_kbucket_key());
109 peer_distances.push((peer_id, addrs, distance));
110 }
111
112 peer_distances.sort_by(|a, b| a.2.cmp(&b.2));
114
115 let sorted_peers: Vec<(PeerId, Addresses)> = peer_distances
117 .into_iter()
118 .take(expected_entries)
119 .map(|(peer_id, addrs, _)| (peer_id, addrs))
120 .collect();
121
122 Ok(sorted_peers)
123}
124
125#[derive(Clone, Debug, Default)]
127pub struct Addresses(pub Vec<Multiaddr>);
128
129#[derive(Clone, Debug)]
130pub struct Network {
132 inner: Arc<NetworkInner>,
133}
134
135#[derive(Debug)]
138struct NetworkInner {
139 network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
140 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
141 peer_id: PeerId,
142 keypair: Keypair,
143}
144
145impl Network {
146 pub fn new(
147 network_swarm_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
148 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
149 peer_id: PeerId,
150 keypair: Keypair,
151 ) -> Self {
152 Self {
153 inner: Arc::new(NetworkInner {
154 network_swarm_cmd_sender,
155 local_swarm_cmd_sender,
156 peer_id,
157 keypair,
158 }),
159 }
160 }
161
162 pub fn peer_id(&self) -> PeerId {
164 self.inner.peer_id
165 }
166
167 pub fn keypair(&self) -> &Keypair {
169 &self.inner.keypair
170 }
171
172 pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender<NetworkSwarmCmd> {
174 &self.inner.network_swarm_cmd_sender
175 }
176 pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender<LocalSwarmCmd> {
178 &self.inner.local_swarm_cmd_sender
179 }
180
181 pub fn sign(&self, msg: &[u8]) -> Result<Vec<u8>> {
183 self.keypair().sign(msg).map_err(NetworkError::from)
184 }
185
186 pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool {
188 self.keypair().public().verify(msg, sig)
189 }
190
191 pub fn get_pub_key(&self) -> Vec<u8> {
193 self.keypair().public().encode_protobuf()
194 }
195
196 pub async fn get_local_peers_with_multiaddr(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>> {
199 let (sender, receiver) = oneshot::channel();
200 self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
201 receiver
202 .await
203 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
204 }
205
206 pub async fn get_kbuckets(&self) -> Result<BTreeMap<u32, Vec<PeerId>>> {
210 let (sender, receiver) = oneshot::channel();
211 self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender });
212 receiver
213 .await
214 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
215 }
216
217 pub async fn get_k_closest_local_peers_to_the_target(
221 &self,
222 key: Option<NetworkAddress>,
223 ) -> Result<Vec<(PeerId, Addresses)>> {
224 let target = if let Some(target) = key {
225 target
226 } else {
227 NetworkAddress::from(self.peer_id())
228 };
229
230 let (sender, receiver) = oneshot::channel();
231 self.send_local_swarm_cmd(LocalSwarmCmd::GetKCloseLocalPeersToTarget {
232 sender,
233 key: target,
234 });
235
236 receiver
237 .await
238 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
239 }
240
241 pub async fn get_local_quoting_metrics(
243 &self,
244 key: RecordKey,
245 data_type: u32,
246 data_size: usize,
247 ) -> Result<(QuotingMetrics, bool)> {
248 let (sender, receiver) = oneshot::channel();
249 self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalQuotingMetrics {
250 key,
251 data_type,
252 data_size,
253 sender,
254 });
255
256 let quoting_metrics = receiver
257 .await
258 .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
259 Ok(quoting_metrics)
260 }
261
262 pub fn notify_payment_received(&self) {
264 self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived);
265 }
266
267 pub async fn get_local_record(&self, key: &RecordKey) -> Result<Option<Record>> {
269 let (sender, receiver) = oneshot::channel();
270 self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord {
271 key: key.clone(),
272 sender,
273 });
274
275 receiver
276 .await
277 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
278 }
279
280 pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result<bool> {
282 let (sender, receiver) = oneshot::channel();
283 self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender });
284
285 receiver
286 .await
287 .map_err(|_e| NetworkError::InternalMsgChannelDropped)
288 }
289
290 pub fn notify_fetch_completed(&self, key: RecordKey, record_type: ValidationType) {
293 self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type)))
294 }
295
296 pub fn put_local_record(&self, record: Record, is_client_put: bool) {
299 debug!(
300 "Writing Record locally, for {:?} - length {:?}",
301 PrettyPrintRecordKey::from(&record.key),
302 record.value.len()
303 );
304 self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord {
305 record,
306 is_client_put,
307 })
308 }
309
310 pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result<bool> {
312 let (sender, receiver) = oneshot::channel();
313 self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey {
314 key: key.clone(),
315 sender,
316 });
317
318 let is_present = receiver
319 .await
320 .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
321
322 Ok(is_present)
323 }
324
325 pub async fn get_all_local_record_addresses(
327 &self,
328 ) -> Result<HashMap<NetworkAddress, ValidationType>> {
329 let (sender, receiver) = oneshot::channel();
330 self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender });
331
332 let addrs = receiver
333 .await
334 .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
335 Ok(addrs)
336 }
337
338 pub async fn send_request(
345 &self,
346 req: Request,
347 peer: PeerId,
348 addrs: Addresses,
349 ) -> Result<(Response, Option<ConnectionInfo>)> {
350 let (sender, receiver) = oneshot::channel();
351 let req_str = format!("{req:?}");
352 self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
354 req: req.clone(),
355 peer,
356 addrs: None,
357 sender: Some(sender),
358 });
359 let mut r = receiver.await?;
360
361 if let Err(error) = &r {
362 error!("Error in response: {:?}", error);
363
364 match error {
365 NetworkError::OutboundError(OutboundFailure::Io(_))
366 | NetworkError::OutboundError(OutboundFailure::ConnectionClosed)
367 | NetworkError::OutboundError(OutboundFailure::DialFailure) => {
368 warn!(
369 "Outbound failed for {req_str} .. {error:?}, dialing it then re-attempt."
370 );
371
372 let dial_addrs = if addrs.0.is_empty() {
376 debug!("Input addrs of {peer:?} is empty, lookup from local");
377 let (sender, receiver) = oneshot::channel();
378
379 self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender });
380 let peers = receiver.await?;
381
382 let Some(new_addrs) = peers
383 .iter()
384 .find(|(id, _addrs)| *id == peer)
385 .map(|(_id, addrs)| addrs.clone())
386 else {
387 error!("Cann't find the addrs of peer {peer:?} from local, during the request reattempt of {req:?}.");
388 return r;
389 };
390 Addresses(new_addrs)
391 } else {
392 addrs.clone()
393 };
394
395 self.send_network_swarm_cmd(NetworkSwarmCmd::DialPeer {
396 peer,
397 addrs: dial_addrs.clone(),
398 });
399
400 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
402
403 let (sender, receiver) = oneshot::channel();
404 debug!("Reattempting to send_request {req_str} to {peer:?} by dialing the addrs manually.");
405 self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
406 req,
407 peer,
408 addrs: Some(dial_addrs),
409 sender: Some(sender),
410 });
411
412 r = receiver.await?;
413 if let Err(error) = &r {
414 error!("Reattempt of {req_str} led to an error again (even after dialing). {error:?}");
415 }
416 }
417 _ => {
418 warn!("Error in response for {req_str}: {error:?}",);
420 }
421 }
422 }
423
424 r
425 }
426
427 pub fn send_response(&self, resp: Response, channel: MsgResponder) {
429 self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel })
430 }
431
432 pub async fn get_swarm_local_state(&self) -> Result<SwarmLocalState> {
434 let (sender, receiver) = oneshot::channel();
435 self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender));
436 let state = receiver.await?;
437 Ok(state)
438 }
439
440 pub fn trigger_interval_replication(&self) {
441 self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication)
442 }
443
444 pub fn add_fresh_records_to_the_replication_fetcher(
446 &self,
447 holder: NetworkAddress,
448 keys: Vec<(NetworkAddress, ValidationType)>,
449 ) {
450 self.send_local_swarm_cmd(LocalSwarmCmd::AddFreshReplicateRecords { holder, keys })
451 }
452
453 pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) {
454 self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue });
455 }
456
457 pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) {
458 self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes });
459 }
460
461 pub fn trigger_irrelevant_record_cleanup(&self) {
462 self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
463 }
464
465 pub fn notify_peer_scores(&self, peer_scores: Vec<(PeerId, bool)>) {
466 self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerScores { peer_scores })
467 }
468
469 pub fn notify_node_version(&self, peer: PeerId, version: String) {
470 self.send_local_swarm_cmd(LocalSwarmCmd::NotifyPeerVersion { peer, version })
471 }
472
473 pub fn remove_peer(&self, peer: PeerId) {
474 self.send_local_swarm_cmd(LocalSwarmCmd::RemovePeer { peer })
475 }
476
477 fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
479 send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
480 }
481
482 fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) {
484 send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd);
485 }
486
487 pub async fn get_closest_peers(
489 &self,
490 key: &NetworkAddress,
491 ) -> Result<Vec<(PeerId, Addresses)>> {
492 let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
493 debug!("Getting the all closest peers in range of {pretty_key:?}");
494 let (sender, receiver) = oneshot::channel();
495 self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
496 key: key.clone(),
497 sender,
498 });
499
500 let closest_peers = receiver.await?;
501
502 if closest_peers.is_empty() {
504 return Err(NetworkError::GetClosestTimedOut);
505 }
506
507 if tracing::level_enabled!(tracing::Level::DEBUG) {
508 let close_peers_pretty_print: Vec<_> = closest_peers
509 .iter()
510 .map(|(peer_id, _)| {
511 format!(
512 "{peer_id:?}({:?})",
513 PrettyPrintKBucketKey(NetworkAddress::from(*peer_id).as_kbucket_key())
514 )
515 })
516 .collect();
517
518 debug!(
519 "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
520 );
521 }
522
523 Ok(closest_peers)
524 }
525
526 pub async fn get_n_closest_peers(
528 &self,
529 key: &NetworkAddress,
530 n: usize,
531 ) -> Result<Vec<(PeerId, Addresses)>> {
532 assert!(n <= K_VALUE.get());
533
534 let mut closest_peers = self.get_closest_peers(key).await?;
535
536 if closest_peers.len() < n {
538 return Err(NetworkError::NotEnoughPeers {
539 found: closest_peers.len(),
540 required: n,
541 });
542 }
543
544 closest_peers.truncate(n);
546
547 Ok(closest_peers)
548 }
549
550 pub async fn send_and_get_responses(
554 &self,
555 peers: &[(PeerId, Addresses)],
556 req: &Request,
557 get_all_responses: bool,
558 ) -> BTreeMap<PeerId, Result<(Response, Option<ConnectionInfo>)>> {
559 debug!("send_and_get_responses for {req:?}");
560 let mut list_of_futures = peers
561 .iter()
562 .map(|(peer, addrs)| {
563 Box::pin(async {
564 let resp = self.send_request(req.clone(), *peer, addrs.clone()).await;
565 (*peer, resp)
566 })
567 })
568 .collect::<Vec<_>>();
569
570 let mut responses = BTreeMap::new();
571 while !list_of_futures.is_empty() {
572 let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await;
573 let resp_string = match &resp {
574 Ok(resp) => format!("{resp:?}"),
575 Err(err) => format!("{err:?}"),
576 };
577 debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}");
578 if !get_all_responses && resp.is_ok() {
579 return BTreeMap::from([(peer, resp)]);
580 }
581 responses.insert(peer, resp);
582 list_of_futures = remaining_futures;
583 }
584
585 debug!("Received all responses for {req:?}");
586 responses
587 }
588
589 pub async fn get_network_density(&self) -> Result<Option<KBucketDistance>> {
591 let (sender, receiver) = oneshot::channel();
592 self.send_local_swarm_cmd(LocalSwarmCmd::GetNetworkDensity { sender });
593
594 let density = receiver
595 .await
596 .map_err(|_e| NetworkError::InternalMsgChannelDropped)?;
597 Ok(density)
598 }
599}
600
601pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool {
604 !multiaddr.iter().any(|addr| match addr {
605 Protocol::Ip4(ip) => {
606 ip.is_unspecified()
609 | ip.is_private()
610 | ip.is_loopback()
611 | ip.is_link_local()
612 | ip.is_documentation()
613 | ip.is_broadcast()
614 }
615 _ => false,
616 })
617}
618
619pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option<PeerId> {
621 if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
622 let _ = multiaddr.pop();
624 Some(peer_id)
625 } else {
626 None
627 }
628}
629
630pub(crate) fn multiaddr_get_p2p(multiaddr: &Multiaddr) -> Option<PeerId> {
632 if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() {
633 Some(peer_id)
634 } else {
635 None
636 }
637}
638
639pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr {
642 let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit));
643
644 if is_relayed {
645 let mut before_relay_protocol = true;
648 let mut new_multi_addr = Multiaddr::empty();
649 for p in multiaddr.iter() {
650 if matches!(p, Protocol::P2pCircuit) {
651 before_relay_protocol = false;
652 }
653 if matches!(p, Protocol::P2p(_)) && !before_relay_protocol {
654 continue;
655 }
656 new_multi_addr.push(p);
657 }
658 new_multi_addr
659 } else {
660 multiaddr
661 .iter()
662 .filter(|p| !matches!(p, Protocol::P2p(_)))
663 .collect()
664 }
665}
666
667pub(crate) fn craft_valid_multiaddr_without_p2p(addr: &Multiaddr) -> Option<Multiaddr> {
670 let mut new_multiaddr = Multiaddr::empty();
671 let ip = addr.iter().find_map(|p| match p {
672 Protocol::Ip4(addr) => Some(addr),
673 _ => None,
674 })?;
675 let port = multiaddr_get_port(addr)?;
676
677 new_multiaddr.push(Protocol::Ip4(ip));
678 new_multiaddr.push(Protocol::Udp(port));
679 new_multiaddr.push(Protocol::QuicV1);
680
681 Some(new_multiaddr)
682}
683
684pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option<IpAddr> {
686 addr.iter().find_map(|p| match p {
687 Protocol::Ip4(addr) => Some(IpAddr::V4(addr)),
688 Protocol::Ip6(addr) => Some(IpAddr::V6(addr)),
689 _ => None,
690 })
691}
692
693pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option<u16> {
694 addr.iter().find_map(|p| match p {
695 Protocol::Udp(port) => Some(port),
696 _ => None,
697 })
698}
699
700pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender<LocalSwarmCmd>, cmd: LocalSwarmCmd) {
701 let capacity = swarm_cmd_sender.capacity();
702
703 if capacity == 0 {
704 error!(
705 "SwarmCmd channel is full. Await capacity to send: {:?}",
706 cmd
707 );
708 }
709
710 let _handle = spawn(async move {
712 if let Err(error) = swarm_cmd_sender.send(cmd).await {
713 error!("Failed to send SwarmCmd: {}", error);
714 }
715 });
716}
717
718pub(crate) fn send_network_swarm_cmd(
719 swarm_cmd_sender: Sender<NetworkSwarmCmd>,
720 cmd: NetworkSwarmCmd,
721) {
722 let capacity = swarm_cmd_sender.capacity();
723
724 if capacity == 0 {
725 error!(
726 "SwarmCmd channel is full. Await capacity to send: {:?}",
727 cmd
728 );
729 }
730
731 let _handle = spawn(async move {
733 if let Err(error) = swarm_cmd_sender.send(cmd).await {
734 error!("Failed to send SwarmCmd: {}", error);
735 }
736 });
737}