1#[cfg(feature = "open-metrics")]
10use crate::metrics::NetworkMetricsRecorder;
11use crate::{
12 bootstrap::{InitialBootstrap, InitialBootstrapTrigger, INITIAL_BOOTSTRAP_CHECK_INTERVAL},
13 circular_vec::CircularVec,
14 cmd::{LocalSwarmCmd, NetworkSwarmCmd},
15 driver::kad::U256,
16 error::Result,
17 event::{NetworkEvent, NodeEvent},
18 external_address::ExternalAddressManager,
19 log_markers::Marker,
20 network_discovery::{NetworkDiscovery, NETWORK_DISCOVER_INTERVAL},
21 relay_manager::RelayManager,
22 replication_fetcher::ReplicationFetcher,
23 time::{interval, spawn, Instant, Interval},
24 Addresses, NodeIssue, NodeRecordStore, CLOSE_GROUP_SIZE,
25};
26use ant_bootstrap::BootstrapCacheStore;
27use ant_evm::PaymentQuote;
28use ant_protocol::messages::ConnectionInfo;
29use ant_protocol::{
30 messages::{Request, Response},
31 NetworkAddress,
32};
33use futures::StreamExt;
34use libp2p::{
35 kad::{self, KBucketDistance as Distance, QueryId, K_VALUE},
36 request_response::OutboundRequestId,
37 swarm::{
38 dial_opts::{DialOpts, PeerCondition},
39 ConnectionId, Swarm,
40 },
41 Multiaddr, PeerId,
42};
43use libp2p::{
44 request_response,
45 swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
46};
47use rand::Rng;
48use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet};
49use tokio::sync::{mpsc, oneshot, watch};
50use tokio::time::Duration;
51use tracing::warn;
52
53pub(crate) type BadNodes = BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>;
56
57pub(crate) const CLOSET_RECORD_CHECK_INTERVAL: Duration = Duration::from_secs(15);
60
61pub(crate) const RELAY_MANAGER_RESERVATION_INTERVAL: Duration = Duration::from_secs(30);
63
64const DIAL_QUEUE_CHECK_INTERVAL: Duration = Duration::from_secs(2);
66
67pub(crate) enum PendingGetClosestType {
69 NetworkDiscovery,
72 FunctionCall(oneshot::Sender<Vec<(PeerId, Addresses)>>),
74}
75type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, Vec<(PeerId, Addresses)>)>;
76
77impl From<std::convert::Infallible> for NodeEvent {
78 fn from(_: std::convert::Infallible) -> Self {
79 panic!("NodeBehaviour is not Infallible!")
80 }
81}
82
83#[derive(NetworkBehaviour)]
87#[behaviour(to_swarm = "NodeEvent")]
88pub(super) struct NodeBehaviour {
89 pub(super) blocklist:
90 libp2p::allow_block_list::Behaviour<libp2p::allow_block_list::BlockedPeers>,
91 pub(super) do_not_disturb: crate::behaviour::do_not_disturb::Behaviour,
92 pub(super) identify: libp2p::identify::Behaviour,
93 pub(super) upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
94 pub(super) relay_client: libp2p::relay::client::Behaviour,
95 pub(super) relay_server: Toggle<libp2p::relay::Behaviour>,
96 pub(super) kademlia: kad::Behaviour<NodeRecordStore>,
97 pub(super) request_response: request_response::cbor::Behaviour<Request, Response>,
98}
99
100pub struct SwarmDriver {
101 pub(crate) swarm: Swarm<NodeBehaviour>,
102 pub(crate) self_peer_id: PeerId,
103 pub(crate) local: bool,
105 pub(crate) is_relay_client: bool,
106 #[cfg(feature = "open-metrics")]
107 pub(crate) close_group: Vec<PeerId>,
108 pub(crate) peers_in_rt: usize,
109 pub(crate) initial_bootstrap: InitialBootstrap,
110 pub(crate) initial_bootstrap_trigger: InitialBootstrapTrigger,
111 pub(crate) network_discovery: NetworkDiscovery,
112 pub(crate) bootstrap_cache: Option<BootstrapCacheStore>,
113 pub(crate) external_address_manager: Option<ExternalAddressManager>,
114 pub(crate) relay_manager: Option<RelayManager>,
115 pub(crate) connected_relay_clients: HashSet<PeerId>,
117 pub(crate) replication_fetcher: ReplicationFetcher,
119 #[cfg(feature = "open-metrics")]
120 pub(crate) metrics_recorder: Option<NetworkMetricsRecorder>,
121
122 pub(crate) network_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
123 pub(crate) local_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
124 pub(crate) local_cmd_receiver: mpsc::Receiver<LocalSwarmCmd>,
125 pub(crate) network_cmd_receiver: mpsc::Receiver<NetworkSwarmCmd>,
126 pub(crate) event_sender: mpsc::Sender<NetworkEvent>, pub(crate) pending_get_closest_peers: PendingGetClosest,
130 #[allow(clippy::type_complexity)]
131 pub(crate) pending_requests: HashMap<
132 OutboundRequestId,
133 Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
134 >,
135 pub(crate) dialed_peers: CircularVec<PeerId>,
137 pub(crate) dial_queue: HashMap<PeerId, (Addresses, Instant, usize)>,
138 pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Multiaddr, Instant)>,
141 pub(crate) latest_established_connection_ids: HashMap<usize, (Multiaddr, Instant)>,
144 pub(crate) handling_statistics: BTreeMap<String, Vec<Duration>>,
146 pub(crate) handled_times: usize,
147 pub(crate) hard_disk_write_error: usize,
148 pub(crate) bad_nodes: BadNodes,
149 pub(crate) quotes_history: BTreeMap<PeerId, PaymentQuote>,
150 pub(crate) replication_targets: BTreeMap<PeerId, Instant>,
151 pub(crate) last_replication: Option<Instant>,
154 pub(crate) last_connection_pruning_time: Instant,
156 pub(crate) peers_version: HashMap<PeerId, String>,
158}
159
160impl SwarmDriver {
161 pub async fn run(mut self, mut shutdown_rx: watch::Receiver<bool>) {
169 let mut network_discover_interval = interval(NETWORK_DISCOVER_INTERVAL);
170 let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
171 let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);
172 let mut initial_bootstrap_trigger_check_interval =
173 Some(interval(INITIAL_BOOTSTRAP_CHECK_INTERVAL));
174 let mut dial_queue_check_interval = interval(DIAL_QUEUE_CHECK_INTERVAL);
175 dial_queue_check_interval.tick().await; let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| {
178 if cache.config().disable_cache_writing {
179 None
180 } else {
181 let duration =
183 Self::duration_with_variance(cache.config().min_cache_save_duration, 10);
184 Some(interval(duration))
185 }
186 });
187 if let Some(interval) = bootstrap_cache_save_interval.as_mut() {
188 interval.tick().await; info!(
190 "Bootstrap cache save interval is set to {:?}",
191 interval.period()
192 );
193 }
194
195 let mut round_robin_index = 0;
196 let mut previous_incoming_connection_error_event = None;
198 loop {
199 tokio::select! {
200 biased;
202
203 local_cmd = self.local_cmd_receiver.recv() => match local_cmd {
206 Some(cmd) => {
207 let start = Instant::now();
208 let cmd_string = format!("{cmd:?}");
209 if let Err(err) = self.handle_local_cmd(cmd) {
210 warn!("Error while handling local cmd: {err}");
211 }
212 trace!("LocalCmd handled in {:?}: {cmd_string:?}", start.elapsed());
213 },
214 None => continue,
215 },
216 some_cmd = self.network_cmd_receiver.recv() => match some_cmd {
218 Some(cmd) => {
219 let start = Instant::now();
220 let cmd_string = format!("{cmd:?}");
221 if let Err(err) = self.handle_network_cmd(cmd) {
222 warn!("Error while handling cmd: {err}");
223 }
224 trace!("SwarmCmd handled in {:?}: {cmd_string:?}", start.elapsed());
225 },
226 None => continue,
227 },
228 result = shutdown_rx.changed() => {
230 if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
231 info!("Shutdown signal received or sender dropped. Exiting swarm driver loop.");
232 break;
233 }
234 },
235 swarm_event = self.swarm.select_next_some() => {
237 if let Some(previous_event) = previous_incoming_connection_error_event.take() {
240 if let Err(err) = self.handle_swarm_events(swarm_event) {
241 warn!("Error while handling swarm event: {err}");
242 }
243 if let Err(err) = self.handle_swarm_events(previous_event) {
244 warn!("Error while handling swarm event: {err}");
245 }
246 continue;
247 }
248 if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
249 previous_incoming_connection_error_event = Some(swarm_event);
250 continue;
251 }
252
253 if let Err(err) = self.handle_swarm_events(swarm_event) {
256 warn!("Error while handling swarm event: {err}");
257 }
258 },
259 _ = dial_queue_check_interval.tick() => {
262 let now = Instant::now();
263 let mut to_remove = vec![];
264 for (peer_id, (addrs, wait_time, _resets)) in self.dial_queue.iter() {
267 if now > *wait_time {
268 info!("Dialing peer {peer_id:?} from dial queue with addresses {addrs:?}");
269 to_remove.push(*peer_id);
270 if let Err(err) = self.swarm.dial(
271 DialOpts::peer_id(*peer_id)
272 .condition(PeerCondition::NotDialing)
273 .addresses(addrs.0.clone())
274 .build(),
275 ) {
276 warn!(%peer_id, ?addrs, "dialing error: {err:?}");
277 }
278 }
279 }
280
281 for peer_id in to_remove.iter() {
282 self.dial_queue.remove(peer_id);
283 }
284 },
285
286 Some(()) = Self::conditional_interval(&mut initial_bootstrap_trigger_check_interval) => {
289 if self.initial_bootstrap_trigger.should_trigger_initial_bootstrap() {
290 info!("Triggering initial bootstrap process. This is a one-time operation.");
291 self.initial_bootstrap.trigger_bootstrapping_process(&mut self.swarm, self.peers_in_rt);
292 initial_bootstrap_trigger_check_interval = None;
295 }
296 }
297
298 _ = network_discover_interval.tick() => {
300 round_robin_index += 1;
301 if round_robin_index > 255 {
302 round_robin_index = 0;
303 }
304
305 if let Some(new_interval) = self.run_network_discover_continuously(network_discover_interval.period(), round_robin_index).await {
306 network_discover_interval = new_interval;
307 }
308
309 let mut peers_in_non_full_buckets = vec![];
311 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
312 let num_entires = kbucket.num_entries();
313 if num_entires >= K_VALUE.get() {
314 continue;
315 } else {
316 let peers_in_kbucket = kbucket
317 .iter()
318 .map(|peer_entry| peer_entry.node.key.into_preimage())
319 .collect::<Vec<PeerId>>();
320 peers_in_non_full_buckets.extend(peers_in_kbucket);
321 }
322 }
323
324 self.peers_version
326 .retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
327
328 #[cfg(feature = "open-metrics")]
329 if let Some(metrics_recorder) = &self.metrics_recorder {
330 metrics_recorder.update_node_versions(&self.peers_version);
331 }
332 }
333 _ = set_farthest_record_interval.tick() => {
334 let kbucket_status = self.get_kbuckets_status();
335 self.update_on_kbucket_status(&kbucket_status);
336 if kbucket_status.estimated_network_size <= CLOSE_GROUP_SIZE {
337 info!("Not enough estimated network size {}, with {} peers_in_non_full_buckets and {} num_of_full_buckets.",
338 kbucket_status.estimated_network_size,
339 kbucket_status.peers_in_non_full_buckets,
340 kbucket_status.num_of_full_buckets);
341 continue;
342 }
343 let density = U256::MAX / U256::from(kbucket_status.estimated_network_size);
348 let density_distance = density * U256::from(CLOSE_GROUP_SIZE);
349
350 let closest_k_peers = self.get_closest_k_local_peers_to_self();
353 if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
354 continue;
355 }
356 let self_addr = NetworkAddress::from(self.self_peer_id);
359 let close_peers_distance = self_addr.distance(&NetworkAddress::from(closest_k_peers[CLOSE_GROUP_SIZE + 1].0));
360
361 let distance = std::cmp::max(Distance(density_distance), close_peers_distance);
362
363 info!("Set responsible range to {distance:?}({:?})", distance.ilog2());
364
365 self.swarm.behaviour_mut().kademlia.store_mut().set_responsible_distance_range(distance);
367 self.replication_fetcher.set_replication_distance_range(distance);
369 }
370 _ = relay_manager_reservation_interval.tick() => {
371 if let Some(relay_manager) = &mut self.relay_manager {
372 relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes)
373 }
374 },
375 Some(()) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => {
376 let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else {
377 continue;
378 };
379 let start = Instant::now();
380
381 if self.sync_and_flush_cache().is_err() {
382 warn!("Failed to sync and flush bootstrap cache, skipping this interval");
383 continue;
384 }
385
386 let Some(bootstrap_config) = self.bootstrap_cache.as_ref().map(|cache|cache.config()) else {
387 continue;
388 };
389 if current_interval.period() >= bootstrap_config.max_cache_save_duration {
390 continue;
391 }
392
393 let max_cache_save_duration =
395 Self::duration_with_variance(bootstrap_config.max_cache_save_duration, 1);
396
397 let scaled = current_interval.period().as_secs().saturating_mul(bootstrap_config.cache_save_scaling_factor);
399 let new_duration = Duration::from_secs(std::cmp::min(scaled, max_cache_save_duration.as_secs()));
400 info!("Scaling up the bootstrap cache save interval to {new_duration:?}");
401
402 *current_interval = interval(new_duration);
403 current_interval.tick().await;
404
405 trace!("Bootstrap cache synced in {:?}", start.elapsed());
406
407 },
408 }
409 }
410 }
411
412 pub(crate) fn queue_network_swarm_cmd(&self, event: NetworkSwarmCmd) {
419 let event_sender = self.network_cmd_sender.clone();
420 let capacity = event_sender.capacity();
421
422 let _handle = spawn(async move {
424 if capacity == 0 {
425 warn!(
426 "NetworkSwarmCmd channel is full. Await capacity to send: {:?}",
427 event
428 );
429 }
430 if let Err(error) = event_sender.send(event).await {
431 error!("SwarmDriver failed to send event: {}", error);
432 }
433 });
434 }
435
436 pub(crate) fn send_event(&self, event: NetworkEvent) {
439 let event_sender = self.event_sender.clone();
440 let capacity = event_sender.capacity();
441
442 let _handle = spawn(async move {
444 if capacity == 0 {
445 warn!(
446 "NetworkEvent channel is full. Await capacity to send: {:?}",
447 event
448 );
449 }
450 if let Err(error) = event_sender.send(event).await {
451 error!("SwarmDriver failed to send event: {}", error);
452 }
453 });
454 }
455
456 pub(crate) fn get_closest_k_local_peers_to_self(&mut self) -> Vec<(PeerId, Addresses)> {
459 self.get_closest_k_local_peers_to_target(&NetworkAddress::from(self.self_peer_id), true)
460 }
461
462 pub(crate) fn get_closest_k_local_peers_to_target(
466 &mut self,
467 target: &NetworkAddress,
468 include_self: bool,
469 ) -> Vec<(PeerId, Addresses)> {
470 let num_peers = if include_self {
471 K_VALUE.get() - 1
472 } else {
473 K_VALUE.get()
474 };
475
476 let peer_ids: Vec<_> = self
477 .swarm
478 .behaviour_mut()
479 .kademlia
480 .get_closest_local_peers(&target.as_kbucket_key())
481 .map(|key| key.into_preimage())
483 .take(num_peers)
484 .collect();
485
486 if include_self {
487 std::iter::once((self.self_peer_id, Default::default()))
489 .chain(self.collect_peers_info(peer_ids))
490 .collect()
491 } else {
492 self.collect_peers_info(peer_ids)
493 }
494 }
495
496 fn collect_peers_info(&mut self, peers: Vec<PeerId>) -> Vec<(PeerId, Addresses)> {
498 let mut peers_info = vec![];
499 for peer_id in peers {
500 if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
501 if let Some(entry) = kbucket
502 .iter()
503 .find(|entry| entry.node.key.preimage() == &peer_id)
504 {
505 peers_info.push((peer_id, Addresses(entry.node.value.clone().into_vec())));
506 }
507 }
508 }
509
510 peers_info
511 }
512
513 pub(crate) fn log_handling(&mut self, handle_string: String, handle_time: Duration) {
516 if handle_string.is_empty() {
517 return;
518 }
519
520 match self.handling_statistics.entry(handle_string) {
521 Entry::Occupied(mut entry) => {
522 let records = entry.get_mut();
523 records.push(handle_time);
524 }
525 Entry::Vacant(entry) => {
526 entry.insert(vec![handle_time]);
527 }
528 }
529
530 self.handled_times += 1;
531
532 if self.handled_times >= 100 {
533 self.handled_times = 0;
534
535 let mut stats: Vec<(String, usize, Duration)> = self
536 .handling_statistics
537 .iter()
538 .map(|(kind, durations)| {
539 let count = durations.len();
540 let avg_time = durations.iter().sum::<Duration>() / count as u32;
541 (kind.clone(), count, avg_time)
542 })
543 .collect();
544
545 stats.sort_by(|a, b| b.1.cmp(&a.1)); trace!("SwarmDriver Handling Statistics: {:?}", stats);
548 self.handling_statistics.clear();
550 }
551 }
552
553 pub(crate) fn record_metrics(&self, marker: Marker) {
556 marker.log();
557 #[cfg(feature = "open-metrics")]
558 if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
559 metrics_recorder.record_from_marker(marker)
560 }
561 }
562 #[cfg(feature = "open-metrics")]
563 pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec<PeerId>) {
565 if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
566 metrics_recorder.record_change_in_close_group(new_close_group);
567 }
568 }
569
570 pub(crate) fn listen_on(&mut self, addr: Multiaddr) -> Result<()> {
572 let id = self.swarm.listen_on(addr.clone())?;
573 info!("Listening on {id:?} with addr: {addr:?}");
574 Ok(())
575 }
576
577 pub(crate) fn sync_and_flush_cache(&mut self) -> Result<()> {
581 if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() {
582 let config = bootstrap_cache.config().clone();
583 let mut old_cache = bootstrap_cache.clone();
584
585 if let Ok(new) = BootstrapCacheStore::new(config) {
586 self.bootstrap_cache = Some(new);
587
588 crate::time::spawn(async move {
590 if let Err(err) = old_cache.sync_and_flush_to_disk() {
591 error!("Failed to save bootstrap cache: {err}");
592 }
593 });
594 }
595 }
596 Ok(())
597 }
598
599 fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
601 let variance = duration.as_secs() as f64 * (variance as f64 / 100.0);
602
603 let random_adjustment =
604 Duration::from_secs(rand::thread_rng().gen_range(0..variance as u64));
605 if random_adjustment.as_secs() % 2 == 0 {
606 duration - random_adjustment
607 } else {
608 duration + random_adjustment
609 }
610 }
611
612 async fn conditional_interval(i: &mut Option<Interval>) -> Option<()> {
614 match i {
615 Some(i) => {
616 i.tick().await;
617 Some(())
618 }
619 None => None,
620 }
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use std::time::Duration;
627
628 #[tokio::test]
629 async fn test_duration_variance_fn() {
630 let duration = Duration::from_secs(150);
631 let variance = 10;
632 let expected_variance = Duration::from_secs(15); for _ in 0..10000 {
634 let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance);
635 println!("new_duration: {new_duration:?}");
636 if new_duration < duration - expected_variance
637 || new_duration > duration + expected_variance
638 {
639 panic!("new_duration: {new_duration:?} is not within the expected range",);
640 }
641 }
642 }
643}