ant_networking/
driver.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9#[cfg(feature = "open-metrics")]
10use crate::metrics::NetworkMetricsRecorder;
11use crate::{
12    bootstrap::{InitialBootstrap, InitialBootstrapTrigger, INITIAL_BOOTSTRAP_CHECK_INTERVAL},
13    circular_vec::CircularVec,
14    cmd::{LocalSwarmCmd, NetworkSwarmCmd},
15    driver::kad::U256,
16    error::Result,
17    event::{NetworkEvent, NodeEvent},
18    external_address::ExternalAddressManager,
19    log_markers::Marker,
20    network_discovery::{NetworkDiscovery, NETWORK_DISCOVER_INTERVAL},
21    relay_manager::RelayManager,
22    replication_fetcher::ReplicationFetcher,
23    time::{interval, spawn, Instant, Interval},
24    Addresses, NodeIssue, NodeRecordStore, CLOSE_GROUP_SIZE,
25};
26use ant_bootstrap::BootstrapCacheStore;
27use ant_evm::PaymentQuote;
28use ant_protocol::messages::ConnectionInfo;
29use ant_protocol::{
30    messages::{Request, Response},
31    NetworkAddress,
32};
33use futures::StreamExt;
34use libp2p::{
35    kad::{self, KBucketDistance as Distance, QueryId, K_VALUE},
36    request_response::OutboundRequestId,
37    swarm::{
38        dial_opts::{DialOpts, PeerCondition},
39        ConnectionId, Swarm,
40    },
41    Multiaddr, PeerId,
42};
43use libp2p::{
44    request_response,
45    swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
46};
47use rand::Rng;
48use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet};
49use tokio::sync::{mpsc, oneshot, watch};
50use tokio::time::Duration;
51use tracing::warn;
52
53/// 10 is the max number of issues per node we track to avoid mem leaks
54/// The boolean flag to indicate whether the node is considered as bad or not
55pub(crate) type BadNodes = BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>;
56
57/// Interval over which we check for the farthest record we _should_ be holding
58/// based upon our knowledge of the CLOSE_GROUP
59pub(crate) const CLOSET_RECORD_CHECK_INTERVAL: Duration = Duration::from_secs(15);
60
61/// Interval over which we query relay manager to check if we can make any more reservations.
62pub(crate) const RELAY_MANAGER_RESERVATION_INTERVAL: Duration = Duration::from_secs(30);
63
64/// Interval over which we check if we could dial any peer in the dial queue.
65const DIAL_QUEUE_CHECK_INTERVAL: Duration = Duration::from_secs(2);
66
67/// The ways in which the Get Closest queries are used.
68pub(crate) enum PendingGetClosestType {
69    /// The network discovery method is present at the networking layer
70    /// Thus we can just process the queries made by NetworkDiscovery without using any channels
71    NetworkDiscovery,
72    /// These are queries made by a function at the upper layers and contains a channel to send the result back.
73    FunctionCall(oneshot::Sender<Vec<(PeerId, Addresses)>>),
74}
75type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, Vec<(PeerId, Addresses)>)>;
76
77impl From<std::convert::Infallible> for NodeEvent {
78    fn from(_: std::convert::Infallible) -> Self {
79        panic!("NodeBehaviour is not Infallible!")
80    }
81}
82
83/// The behaviors are polled in the order they are defined.
84/// The first struct member is polled until it returns Poll::Pending before moving on to later members.
85/// Prioritize the behaviors related to connection handling.
86#[derive(NetworkBehaviour)]
87#[behaviour(to_swarm = "NodeEvent")]
88pub(super) struct NodeBehaviour {
89    pub(super) blocklist:
90        libp2p::allow_block_list::Behaviour<libp2p::allow_block_list::BlockedPeers>,
91    pub(super) do_not_disturb: crate::behaviour::do_not_disturb::Behaviour,
92    pub(super) identify: libp2p::identify::Behaviour,
93    pub(super) upnp: Toggle<libp2p::upnp::tokio::Behaviour>,
94    pub(super) relay_client: libp2p::relay::client::Behaviour,
95    pub(super) relay_server: Toggle<libp2p::relay::Behaviour>,
96    pub(super) kademlia: kad::Behaviour<NodeRecordStore>,
97    pub(super) request_response: request_response::cbor::Behaviour<Request, Response>,
98}
99
100pub struct SwarmDriver {
101    pub(crate) swarm: Swarm<NodeBehaviour>,
102    pub(crate) self_peer_id: PeerId,
103    /// When true, we don't filter our local addresses
104    pub(crate) local: bool,
105    pub(crate) is_relay_client: bool,
106    #[cfg(feature = "open-metrics")]
107    pub(crate) close_group: Vec<PeerId>,
108    pub(crate) peers_in_rt: usize,
109    pub(crate) initial_bootstrap: InitialBootstrap,
110    pub(crate) initial_bootstrap_trigger: InitialBootstrapTrigger,
111    pub(crate) network_discovery: NetworkDiscovery,
112    pub(crate) bootstrap_cache: Option<BootstrapCacheStore>,
113    pub(crate) external_address_manager: Option<ExternalAddressManager>,
114    pub(crate) relay_manager: Option<RelayManager>,
115    /// The peers that are using our relay service.
116    pub(crate) connected_relay_clients: HashSet<PeerId>,
117    /// The peers that are closer to our PeerId. Includes self.
118    pub(crate) replication_fetcher: ReplicationFetcher,
119    #[cfg(feature = "open-metrics")]
120    pub(crate) metrics_recorder: Option<NetworkMetricsRecorder>,
121
122    pub(crate) network_cmd_sender: mpsc::Sender<NetworkSwarmCmd>,
123    pub(crate) local_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
124    pub(crate) local_cmd_receiver: mpsc::Receiver<LocalSwarmCmd>,
125    pub(crate) network_cmd_receiver: mpsc::Receiver<NetworkSwarmCmd>,
126    pub(crate) event_sender: mpsc::Sender<NetworkEvent>, // Use `self.send_event()` to send a NetworkEvent.
127
128    /// Trackers for underlying behaviour related events
129    pub(crate) pending_get_closest_peers: PendingGetClosest,
130    #[allow(clippy::type_complexity)]
131    pub(crate) pending_requests: HashMap<
132        OutboundRequestId,
133        Option<oneshot::Sender<Result<(Response, Option<ConnectionInfo>)>>>,
134    >,
135    /// A list of the most recent peers we have dialed ourselves. Old dialed peers are evicted once the vec fills up.
136    pub(crate) dialed_peers: CircularVec<PeerId>,
137    pub(crate) dial_queue: HashMap<PeerId, (Addresses, Instant, usize)>,
138    // Peers that having live connection to. Any peer got contacted during kad network query
139    // will have live connection established. And they may not appear in the RT.
140    pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Multiaddr, Instant)>,
141    /// The list of recently established connections ids.
142    /// This is used to prevent log spamming.
143    pub(crate) latest_established_connection_ids: HashMap<usize, (Multiaddr, Instant)>,
144    // Record the handling time of the recent 10 for each handling kind.
145    pub(crate) handling_statistics: BTreeMap<String, Vec<Duration>>,
146    pub(crate) handled_times: usize,
147    pub(crate) hard_disk_write_error: usize,
148    pub(crate) bad_nodes: BadNodes,
149    pub(crate) quotes_history: BTreeMap<PeerId, PaymentQuote>,
150    pub(crate) replication_targets: BTreeMap<PeerId, Instant>,
151    /// when was the last replication event
152    /// This allows us to throttle replication no matter how it is triggered
153    pub(crate) last_replication: Option<Instant>,
154    /// when was the last outdated connection prunning undertaken.
155    pub(crate) last_connection_pruning_time: Instant,
156    /// record versions of those peers that in the non-full-kbuckets.
157    pub(crate) peers_version: HashMap<PeerId, String>,
158}
159
160impl SwarmDriver {
161    /// Asynchronously drives the swarm event loop, handling events from both
162    /// the swarm and command receiver. This function will run indefinitely,
163    /// until the command channel is closed.
164    ///
165    /// The `tokio::select` macro is used to concurrently process swarm events
166    /// and command receiver messages, ensuring efficient handling of multiple
167    /// asynchronous tasks.
168    pub async fn run(mut self, mut shutdown_rx: watch::Receiver<bool>) {
169        let mut network_discover_interval = interval(NETWORK_DISCOVER_INTERVAL);
170        let mut set_farthest_record_interval = interval(CLOSET_RECORD_CHECK_INTERVAL);
171        let mut relay_manager_reservation_interval = interval(RELAY_MANAGER_RESERVATION_INTERVAL);
172        let mut initial_bootstrap_trigger_check_interval =
173            Some(interval(INITIAL_BOOTSTRAP_CHECK_INTERVAL));
174        let mut dial_queue_check_interval = interval(DIAL_QUEUE_CHECK_INTERVAL);
175        dial_queue_check_interval.tick().await; // first tick completes immediately
176
177        let mut bootstrap_cache_save_interval = self.bootstrap_cache.as_ref().and_then(|cache| {
178            if cache.config().disable_cache_writing {
179                None
180            } else {
181                // add a variance of 10% to the interval, to avoid all nodes writing to disk at the same time.
182                let duration =
183                    Self::duration_with_variance(cache.config().min_cache_save_duration, 10);
184                Some(interval(duration))
185            }
186        });
187        if let Some(interval) = bootstrap_cache_save_interval.as_mut() {
188            interval.tick().await; // first tick completes immediately
189            info!(
190                "Bootstrap cache save interval is set to {:?}",
191                interval.period()
192            );
193        }
194
195        let mut round_robin_index = 0;
196        // temporarily skip processing IncomingConnectionError swarm event to avoid log spamming
197        let mut previous_incoming_connection_error_event = None;
198        loop {
199            tokio::select! {
200                // polls futures in order they appear here (as opposed to random)
201                biased;
202
203                // Prioritise any local cmds pending.
204                // https://github.com/libp2p/rust-libp2p/blob/master/docs/coding-guidelines.md#prioritize-local-work-over-new-work-from-a-remote
205                local_cmd = self.local_cmd_receiver.recv() => match local_cmd {
206                    Some(cmd) => {
207                        let start = Instant::now();
208                        let cmd_string = format!("{cmd:?}");
209                        if let Err(err) = self.handle_local_cmd(cmd) {
210                            warn!("Error while handling local cmd: {err}");
211                        }
212                        trace!("LocalCmd handled in {:?}: {cmd_string:?}", start.elapsed());
213                    },
214                    None =>  continue,
215                },
216                // next check if we have locally generated network cmds
217                some_cmd = self.network_cmd_receiver.recv() => match some_cmd {
218                    Some(cmd) => {
219                        let start = Instant::now();
220                        let cmd_string = format!("{cmd:?}");
221                        if let Err(err) = self.handle_network_cmd(cmd) {
222                            warn!("Error while handling cmd: {err}");
223                        }
224                        trace!("SwarmCmd handled in {:?}: {cmd_string:?}", start.elapsed());
225                    },
226                    None =>  continue,
227                },
228                // Check for a shutdown command.
229                result = shutdown_rx.changed() => {
230                    if result.is_ok() && *shutdown_rx.borrow() || result.is_err() {
231                        info!("Shutdown signal received or sender dropped. Exiting swarm driver loop.");
232                        break;
233                    }
234                },
235                // next take and react to external swarm events
236                swarm_event = self.swarm.select_next_some() => {
237                    // Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip
238                    // processing the event for one round.
239                    if let Some(previous_event) = previous_incoming_connection_error_event.take() {
240                        if let Err(err) = self.handle_swarm_events(swarm_event) {
241                            warn!("Error while handling swarm event: {err}");
242                        }
243                        if let Err(err) = self.handle_swarm_events(previous_event) {
244                            warn!("Error while handling swarm event: {err}");
245                        }
246                        continue;
247                    }
248                    if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) {
249                        previous_incoming_connection_error_event = Some(swarm_event);
250                        continue;
251                    }
252
253                    // logging for handling events happens inside handle_swarm_events
254                    // otherwise we're rewriting match statements etc around this anwyay
255                    if let Err(err) = self.handle_swarm_events(swarm_event) {
256                        warn!("Error while handling swarm event: {err}");
257                    }
258                },
259                // thereafter we can check our intervals
260
261                _ = dial_queue_check_interval.tick() => {
262                    let now = Instant::now();
263                    let mut to_remove = vec![];
264                    // check if we can dial any peer in the dial queue
265                    // if we have no peers in the dial queue, skip this check
266                    for (peer_id, (addrs, wait_time, _resets)) in self.dial_queue.iter() {
267                        if now > *wait_time {
268                            info!("Dialing peer {peer_id:?} from dial queue with addresses {addrs:?}");
269                            to_remove.push(*peer_id);
270                            if let Err(err) = self.swarm.dial(
271                                DialOpts::peer_id(*peer_id)
272                                    .condition(PeerCondition::NotDialing)
273                                    .addresses(addrs.0.clone())
274                                    .build(),
275                            ) {
276                                warn!(%peer_id, ?addrs, "dialing error: {err:?}");
277                            }
278                        }
279                    }
280
281                    for peer_id in to_remove.iter() {
282                        self.dial_queue.remove(peer_id);
283                    }
284                },
285
286                // check if we can trigger the initial bootstrap process
287                // once it is triggered, we don't re-trigger it
288                Some(()) = Self::conditional_interval(&mut initial_bootstrap_trigger_check_interval) => {
289                    if self.initial_bootstrap_trigger.should_trigger_initial_bootstrap() {
290                        info!("Triggering initial bootstrap process. This is a one-time operation.");
291                        self.initial_bootstrap.trigger_bootstrapping_process(&mut self.swarm, self.peers_in_rt);
292                        // we will not call this loop anymore, once the initial bootstrap is triggered.
293                        // It should run on its own and complete.
294                        initial_bootstrap_trigger_check_interval = None;
295                    }
296                }
297
298                // runs every bootstrap_interval time
299                _ = network_discover_interval.tick() => {
300                    round_robin_index += 1;
301                    if round_robin_index > 255 {
302                        round_robin_index = 0;
303                    }
304
305                    if let Some(new_interval) = self.run_network_discover_continuously(network_discover_interval.period(), round_robin_index).await {
306                        network_discover_interval = new_interval;
307                    }
308
309                    // Collect all peers_in_non_full_buckets
310                    let mut peers_in_non_full_buckets = vec![];
311                    for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
312                        let num_entires = kbucket.num_entries();
313                        if num_entires >= K_VALUE.get() {
314                            continue;
315                        } else {
316                            let peers_in_kbucket = kbucket
317                                .iter()
318                                .map(|peer_entry| peer_entry.node.key.into_preimage())
319                                .collect::<Vec<PeerId>>();
320                            peers_in_non_full_buckets.extend(peers_in_kbucket);
321                        }
322                    }
323
324                    // Ensure all existing node_version records are for those peers_in_non_full_buckets
325                    self.peers_version
326                        .retain(|peer_id, _version| peers_in_non_full_buckets.contains(peer_id));
327
328                    #[cfg(feature = "open-metrics")]
329                    if let Some(metrics_recorder) = &self.metrics_recorder {
330                        metrics_recorder.update_node_versions(&self.peers_version);
331                    }
332                }
333                _ = set_farthest_record_interval.tick() => {
334                    let kbucket_status = self.get_kbuckets_status();
335                    self.update_on_kbucket_status(&kbucket_status);
336                    if kbucket_status.estimated_network_size <= CLOSE_GROUP_SIZE {
337                        info!("Not enough estimated network size {}, with {} peers_in_non_full_buckets and {} num_of_full_buckets.",
338                        kbucket_status.estimated_network_size,
339                        kbucket_status.peers_in_non_full_buckets,
340                        kbucket_status.num_of_full_buckets);
341                        continue;
342                    }
343                    // The entire Distance space is U256
344                    // (U256::MAX is 115792089237316195423570985008687907853269984665640564039457584007913129639935)
345                    // The network density (average distance among nodes) can be estimated as:
346                    //     network_density = entire_U256_space / estimated_network_size
347                    let density = U256::MAX / U256::from(kbucket_status.estimated_network_size);
348                    let density_distance = density * U256::from(CLOSE_GROUP_SIZE);
349
350                    // Use distance to close peer to avoid the situation that
351                    // the estimated density_distance is too narrow.
352                    let closest_k_peers = self.get_closest_k_local_peers_to_self();
353                    if closest_k_peers.len() <= CLOSE_GROUP_SIZE + 2 {
354                        continue;
355                    }
356                    // Results are sorted, hence can calculate distance directly
357                    // Note: self is included
358                    let self_addr = NetworkAddress::from(self.self_peer_id);
359                    let close_peers_distance = self_addr.distance(&NetworkAddress::from(closest_k_peers[CLOSE_GROUP_SIZE + 1].0));
360
361                    let distance = std::cmp::max(Distance(density_distance), close_peers_distance);
362
363                    info!("Set responsible range to {distance:?}({:?})", distance.ilog2());
364
365                    // set any new distance to farthest record in the store
366                    self.swarm.behaviour_mut().kademlia.store_mut().set_responsible_distance_range(distance);
367                    // the distance range within the replication_fetcher shall be in sync as well
368                    self.replication_fetcher.set_replication_distance_range(distance);
369                }
370                _ = relay_manager_reservation_interval.tick() => {
371                    if let Some(relay_manager) = &mut self.relay_manager {
372                        relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes)
373                    }
374                },
375                Some(()) = Self::conditional_interval(&mut bootstrap_cache_save_interval) => {
376                    let Some(current_interval) = bootstrap_cache_save_interval.as_mut() else {
377                        continue;
378                    };
379                    let start = Instant::now();
380
381                    if  self.sync_and_flush_cache().is_err() {
382                        warn!("Failed to sync and flush bootstrap cache, skipping this interval");
383                        continue;
384                    }
385
386                    let Some(bootstrap_config) = self.bootstrap_cache.as_ref().map(|cache|cache.config()) else {
387                        continue;
388                    };
389                    if current_interval.period() >= bootstrap_config.max_cache_save_duration {
390                        continue;
391                    }
392
393                    // add a variance of 1% to the max interval to avoid all nodes writing to disk at the same time.
394                    let max_cache_save_duration =
395                        Self::duration_with_variance(bootstrap_config.max_cache_save_duration, 1);
396
397                    // scale up the interval until we reach the max
398                    let scaled = current_interval.period().as_secs().saturating_mul(bootstrap_config.cache_save_scaling_factor);
399                    let new_duration = Duration::from_secs(std::cmp::min(scaled, max_cache_save_duration.as_secs()));
400                    info!("Scaling up the bootstrap cache save interval to {new_duration:?}");
401
402                    *current_interval = interval(new_duration);
403                    current_interval.tick().await;
404
405                    trace!("Bootstrap cache synced in {:?}", start.elapsed());
406
407                },
408            }
409        }
410    }
411
412    // --------------------------------------------
413    // ---------- Crate helpers -------------------
414    // --------------------------------------------
415
416    /// Pushes NetworkSwarmCmd off thread so as to be non-blocking
417    /// this is a wrapper around the `mpsc::Sender::send` call
418    pub(crate) fn queue_network_swarm_cmd(&self, event: NetworkSwarmCmd) {
419        let event_sender = self.network_cmd_sender.clone();
420        let capacity = event_sender.capacity();
421
422        // push the event off thread so as to be non-blocking
423        let _handle = spawn(async move {
424            if capacity == 0 {
425                warn!(
426                    "NetworkSwarmCmd channel is full. Await capacity to send: {:?}",
427                    event
428                );
429            }
430            if let Err(error) = event_sender.send(event).await {
431                error!("SwarmDriver failed to send event: {}", error);
432            }
433        });
434    }
435
436    /// Sends an event after pushing it off thread so as to be non-blocking
437    /// this is a wrapper around the `mpsc::Sender::send` call
438    pub(crate) fn send_event(&self, event: NetworkEvent) {
439        let event_sender = self.event_sender.clone();
440        let capacity = event_sender.capacity();
441
442        // push the event off thread so as to be non-blocking
443        let _handle = spawn(async move {
444            if capacity == 0 {
445                warn!(
446                    "NetworkEvent channel is full. Await capacity to send: {:?}",
447                    event
448                );
449            }
450            if let Err(error) = event_sender.send(event).await {
451                error!("SwarmDriver failed to send event: {}", error);
452            }
453        });
454    }
455
456    /// Get K closest peers to self, from our local RoutingTable.
457    /// Always includes self in.
458    pub(crate) fn get_closest_k_local_peers_to_self(&mut self) -> Vec<(PeerId, Addresses)> {
459        self.get_closest_k_local_peers_to_target(&NetworkAddress::from(self.self_peer_id), true)
460    }
461
462    /// Get K closest peers to the target, from our local RoutingTable.
463    /// Sorted for closeness to the target
464    /// If requested, self will be added as the first entry.
465    pub(crate) fn get_closest_k_local_peers_to_target(
466        &mut self,
467        target: &NetworkAddress,
468        include_self: bool,
469    ) -> Vec<(PeerId, Addresses)> {
470        let num_peers = if include_self {
471            K_VALUE.get() - 1
472        } else {
473            K_VALUE.get()
474        };
475
476        let peer_ids: Vec<_> = self
477            .swarm
478            .behaviour_mut()
479            .kademlia
480            .get_closest_local_peers(&target.as_kbucket_key())
481            // Map KBucketKey<PeerId> to PeerId.
482            .map(|key| key.into_preimage())
483            .take(num_peers)
484            .collect();
485
486        if include_self {
487            // Start with our own PeerID and chain the closest.
488            std::iter::once((self.self_peer_id, Default::default()))
489                .chain(self.collect_peers_info(peer_ids))
490                .collect()
491        } else {
492            self.collect_peers_info(peer_ids)
493        }
494    }
495
496    /// Collect peers' address info
497    fn collect_peers_info(&mut self, peers: Vec<PeerId>) -> Vec<(PeerId, Addresses)> {
498        let mut peers_info = vec![];
499        for peer_id in peers {
500            if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
501                if let Some(entry) = kbucket
502                    .iter()
503                    .find(|entry| entry.node.key.preimage() == &peer_id)
504                {
505                    peers_info.push((peer_id, Addresses(entry.node.value.clone().into_vec())));
506                }
507            }
508        }
509
510        peers_info
511    }
512
513    /// Record one handling time.
514    /// Log for every 100 received.
515    pub(crate) fn log_handling(&mut self, handle_string: String, handle_time: Duration) {
516        if handle_string.is_empty() {
517            return;
518        }
519
520        match self.handling_statistics.entry(handle_string) {
521            Entry::Occupied(mut entry) => {
522                let records = entry.get_mut();
523                records.push(handle_time);
524            }
525            Entry::Vacant(entry) => {
526                entry.insert(vec![handle_time]);
527            }
528        }
529
530        self.handled_times += 1;
531
532        if self.handled_times >= 100 {
533            self.handled_times = 0;
534
535            let mut stats: Vec<(String, usize, Duration)> = self
536                .handling_statistics
537                .iter()
538                .map(|(kind, durations)| {
539                    let count = durations.len();
540                    let avg_time = durations.iter().sum::<Duration>() / count as u32;
541                    (kind.clone(), count, avg_time)
542                })
543                .collect();
544
545            stats.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by count in descending order
546
547            trace!("SwarmDriver Handling Statistics: {:?}", stats);
548            // now we've logged, lets clear the stats from the btreemap
549            self.handling_statistics.clear();
550        }
551    }
552
553    /// Calls Marker::log() to insert the marker into the log files.
554    /// Also calls NodeMetrics::record() to record the metric if the `open-metrics` feature flag is enabled.
555    pub(crate) fn record_metrics(&self, marker: Marker) {
556        marker.log();
557        #[cfg(feature = "open-metrics")]
558        if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
559            metrics_recorder.record_from_marker(marker)
560        }
561    }
562    #[cfg(feature = "open-metrics")]
563    /// Updates metrics that rely on our current close group.
564    pub(crate) fn record_change_in_close_group(&self, new_close_group: Vec<PeerId>) {
565        if let Some(metrics_recorder) = self.metrics_recorder.as_ref() {
566            metrics_recorder.record_change_in_close_group(new_close_group);
567        }
568    }
569
570    /// Listen on the provided address. Also records it within RelayManager
571    pub(crate) fn listen_on(&mut self, addr: Multiaddr) -> Result<()> {
572        let id = self.swarm.listen_on(addr.clone())?;
573        info!("Listening on {id:?} with addr: {addr:?}");
574        Ok(())
575    }
576
577    /// Sync and flush the bootstrap cache to disk.
578    ///
579    /// This function creates a new cache and saves the old one to disk.
580    pub(crate) fn sync_and_flush_cache(&mut self) -> Result<()> {
581        if let Some(bootstrap_cache) = self.bootstrap_cache.as_mut() {
582            let config = bootstrap_cache.config().clone();
583            let mut old_cache = bootstrap_cache.clone();
584
585            if let Ok(new) = BootstrapCacheStore::new(config) {
586                self.bootstrap_cache = Some(new);
587
588                // Save cache to disk.
589                crate::time::spawn(async move {
590                    if let Err(err) = old_cache.sync_and_flush_to_disk() {
591                        error!("Failed to save bootstrap cache: {err}");
592                    }
593                });
594            }
595        }
596        Ok(())
597    }
598
599    /// Returns a new duration that is within +/- variance of the provided duration.
600    fn duration_with_variance(duration: Duration, variance: u32) -> Duration {
601        let variance = duration.as_secs() as f64 * (variance as f64 / 100.0);
602
603        let random_adjustment =
604            Duration::from_secs(rand::thread_rng().gen_range(0..variance as u64));
605        if random_adjustment.as_secs() % 2 == 0 {
606            duration - random_adjustment
607        } else {
608            duration + random_adjustment
609        }
610    }
611
612    /// To tick an optional interval inside tokio::select! without looping forever.
613    async fn conditional_interval(i: &mut Option<Interval>) -> Option<()> {
614        match i {
615            Some(i) => {
616                i.tick().await;
617                Some(())
618            }
619            None => None,
620        }
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use std::time::Duration;
627
628    #[tokio::test]
629    async fn test_duration_variance_fn() {
630        let duration = Duration::from_secs(150);
631        let variance = 10;
632        let expected_variance = Duration::from_secs(15); // 10% of 150
633        for _ in 0..10000 {
634            let new_duration = crate::SwarmDriver::duration_with_variance(duration, variance);
635            println!("new_duration: {new_duration:?}");
636            if new_duration < duration - expected_variance
637                || new_duration > duration + expected_variance
638            {
639                panic!("new_duration: {new_duration:?} is not within the expected range",);
640            }
641        }
642    }
643}