kaspa_addressmanager/
lib.rs

1mod port_mapping_extender;
2mod stores;
3extern crate self as address_manager;
4
5use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};
6
7use address_manager::port_mapping_extender::Extender;
8use igd_next::{
9    self as igd, aio::tokio::Tokio, AddAnyPortError, AddPortError, Gateway, GetExternalIpError, GetGenericPortMappingEntryError,
10    SearchError,
11};
12use itertools::{
13    Either::{Left, Right},
14    Itertools,
15};
16use kaspa_consensus_core::config::Config;
17use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
18use kaspa_database::prelude::{CachePolicy, StoreResultExtensions, DB};
19use kaspa_utils::networking::IpAddress;
20use local_ip_address::list_afinet_netifas;
21use parking_lot::Mutex;
22use stores::banned_address_store::{BannedAddressesStore, BannedAddressesStoreReader, ConnectionBanTimestamp, DbBannedAddressesStore};
23use thiserror::Error;
24
25pub use stores::NetAddress;
26
27const MAX_ADDRESSES: usize = 4096;
28const MAX_CONNECTION_FAILED_COUNT: u64 = 3;
29
30const UPNP_DEADLINE_SEC: u64 = 2 * 60;
31const UPNP_EXTEND_PERIOD: u64 = UPNP_DEADLINE_SEC / 2;
32
33/// The name used as description when registering the UPnP service
34pub(crate) const UPNP_REGISTRATION_NAME: &str = "rusty-kaspa";
35
36struct ExtendHelper {
37    gateway: Gateway,
38    local_addr: SocketAddr,
39    external_port: u16,
40}
41
42#[derive(Error, Debug)]
43pub enum UpnpError {
44    #[error(transparent)]
45    AddPortError(#[from] AddPortError),
46    #[error(transparent)]
47    AddAnyPortError(#[from] AddAnyPortError),
48    #[error(transparent)]
49    SearchError(#[from] SearchError),
50    #[error(transparent)]
51    GetExternalIpError(#[from] GetExternalIpError),
52}
53
54pub struct AddressManager {
55    banned_address_store: DbBannedAddressesStore,
56    address_store: address_store_with_cache::Store,
57    config: Arc<Config>,
58    local_net_addresses: Vec<NetAddress>,
59}
60
61impl AddressManager {
62    pub fn new(config: Arc<Config>, db: Arc<DB>, tick_service: Arc<TickService>) -> (Arc<Mutex<Self>>, Option<Extender>) {
63        let mut instance = Self {
64            banned_address_store: DbBannedAddressesStore::new(db.clone(), CachePolicy::Count(MAX_ADDRESSES)),
65            address_store: address_store_with_cache::new(db),
66            local_net_addresses: Vec::new(),
67            config,
68        };
69
70        let extender = instance.init_local_addresses(tick_service);
71
72        (Arc::new(Mutex::new(instance)), extender)
73    }
74
75    fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
76        self.local_net_addresses = self.local_addresses().collect();
77
78        let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
79            let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
80                Err(err) => {
81                    warn!("[UPnP] Error adding port mapping: {err}");
82                    return None;
83                }
84                Ok(None) => return None,
85                Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
86            };
87            self.local_net_addresses.push(net_address);
88
89            let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
90                addr: gateway.addr,
91                root_url: gateway.root_url,
92                control_url: gateway.control_url,
93                control_schema_url: gateway.control_schema_url,
94                control_schema: gateway.control_schema,
95                provider: Tokio,
96            };
97            Some(Extender::new(
98                tick_service,
99                Duration::from_secs(UPNP_EXTEND_PERIOD),
100                UPNP_DEADLINE_SEC,
101                gateway,
102                external_port,
103                local_addr,
104            ))
105        } else {
106            None
107        };
108
109        self.local_net_addresses.iter().for_each(|net_addr| {
110            info!("Publicly routable local address {} added to store", net_addr);
111        });
112        extender
113    }
114
115    fn local_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
116        match self.config.externalip {
117            // An external IP was passed, we will try to bind that if it's valid
118            Some(local_net_address) if local_net_address.ip.is_publicly_routable() => {
119                info!("External address is publicly routable {}", local_net_address);
120                return Left(iter::once(local_net_address));
121            }
122            Some(local_net_address) => {
123                info!("External address is not publicly routable {}", local_net_address);
124            }
125            None => {}
126        };
127
128        Right(self.routable_addresses_from_net_interfaces())
129    }
130
131    fn routable_addresses_from_net_interfaces(&self) -> impl Iterator<Item = NetAddress> + '_ {
132        // check whatever was passed as listen address (if routable)
133        // otherwise(listen_address === 0.0.0.0) check all interfaces
134        let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
135        if listen_address.ip.is_publicly_routable() {
136            info!("Publicly routable local address found: {}", listen_address.ip);
137            Left(Left(iter::once(listen_address)))
138        } else if listen_address.ip.is_unspecified() {
139            let network_interfaces = list_afinet_netifas();
140            let Ok(network_interfaces) = network_interfaces else {
141                warn!("Error getting network interfaces: {:?}", network_interfaces);
142                return Left(Right(iter::empty()));
143            };
144            // TODO: Add Check IPv4 or IPv6 match from Go code
145            Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
146                |ip| {
147                    info!("Publicly routable local address found: {}", ip);
148                    NetAddress::new(ip, self.config.default_p2p_port())
149                },
150            ))
151        } else {
152            Left(Right(iter::empty()))
153        }
154    }
155
156    fn upnp(&self) -> Result<Option<(NetAddress, ExtendHelper)>, UpnpError> {
157        info!("[UPnP] Attempting to register upnp... (to disable run the node with --disable-upnp)");
158        let gateway = igd::search_gateway(Default::default())?;
159        let ip = IpAddress::new(gateway.get_external_ip()?);
160        if !ip.is_publicly_routable() {
161            info!("[UPnP] Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
162            return Ok(None);
163        }
164        info!("[UPnP] Got external ip from gateway using upnp: {ip}");
165
166        let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
167        let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
168            SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
169        } else {
170            normalized_p2p_listen_address.into()
171        };
172
173        // If an operator runs a node and specifies a non-standard local port, it implies that they also wish to use a non-standard public address. The variable 'desired_external_port' is set to the port number from the normalized peer-to-peer listening address.
174        let desired_external_port = normalized_p2p_listen_address.port;
175        // This loop checks for existing port mappings in the UPnP-enabled gateway.
176        //
177        // The goal of this loop is to identify if the desired external port (`desired_external_port`) is
178        // already mapped to any device inside the local network. This is crucial because, in
179        // certain scenarios, gateways might not throw the `PortInUse` error but rather might
180        // silently remap the external port when there's a conflict. By iterating through the
181        // current mappings, we can make an informed decision about whether to attempt using
182        // the default port or request a new random one.
183        //
184        // The loop goes through all existing port mappings one-by-one:
185        // - If a mapping is found that uses the desired external port, the loop breaks with `already_in_use` set to true.
186        // - If the index is not valid (i.e., we've iterated through all the mappings), the loop breaks with `already_in_use` set to false.
187        // - Any other errors during fetching of port mappings are handled accordingly, but the end result is to exit the loop with the `already_in_use` flag set appropriately.
188        let mut index = 0;
189        let already_in_use = loop {
190            match gateway.get_generic_port_mapping_entry(index) {
191                Ok(entry) => {
192                    if entry.enabled && entry.external_port == desired_external_port {
193                        info!("[UPnP] Found existing mapping that uses the same external port. Description: {}, external port: {}, internal port: {}, client: {}, lease duration: {}", entry.port_mapping_description, entry.external_port, entry.internal_port, entry.internal_client, entry.lease_duration);
194                        break true;
195                    }
196                    index += 1;
197                }
198                Err(GetGenericPortMappingEntryError::ActionNotAuthorized) => {
199                    index += 1;
200                    continue;
201                }
202                Err(GetGenericPortMappingEntryError::RequestError(err)) => {
203                    warn!("[UPnP] request existing port mapping err: {:?}", err);
204                    break false;
205                }
206                Err(GetGenericPortMappingEntryError::SpecifiedArrayIndexInvalid) => break false,
207            }
208        };
209        if already_in_use {
210            let port =
211                gateway.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)?;
212            info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
213            return Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })));
214        }
215
216        match gateway.add_port(
217            igd::PortMappingProtocol::TCP,
218            desired_external_port,
219            local_addr,
220            UPNP_DEADLINE_SEC as u32,
221            UPNP_REGISTRATION_NAME,
222        ) {
223            Ok(_) => {
224                info!("[UPnP] Added port mapping to default external port: {ip}:{desired_external_port}");
225                Ok(Some((
226                    NetAddress { ip, port: desired_external_port },
227                    ExtendHelper { gateway, local_addr, external_port: desired_external_port },
228                )))
229            }
230            Err(AddPortError::PortInUse {}) => {
231                let port = gateway.add_any_port(
232                    igd::PortMappingProtocol::TCP,
233                    local_addr,
234                    UPNP_DEADLINE_SEC as u32,
235                    UPNP_REGISTRATION_NAME,
236                )?;
237                info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
238                Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })))
239            }
240            Err(err) => Err(err.into()),
241        }
242    }
243
244    pub fn best_local_address(&mut self) -> Option<NetAddress> {
245        if self.local_net_addresses.is_empty() {
246            None
247        } else {
248            // TODO: Add logic for finding the best as a function of a peer remote address.
249            // for now, returning the first one
250            Some(self.local_net_addresses[0])
251        }
252    }
253
254    pub fn add_address(&mut self, address: NetAddress) {
255        if address.ip.is_loopback() || address.ip.is_unspecified() {
256            debug!("[Address manager] skipping local address {}", address.ip);
257            return;
258        }
259
260        if self.address_store.has(address) {
261            return;
262        }
263
264        // We mark `connection_failed_count` as 0 only after first success
265        self.address_store.set(address, 1);
266    }
267
268    pub fn mark_connection_failure(&mut self, address: NetAddress) {
269        if !self.address_store.has(address) {
270            return;
271        }
272
273        let new_count = self.address_store.get(address).connection_failed_count + 1;
274        if new_count > MAX_CONNECTION_FAILED_COUNT {
275            self.address_store.remove(address);
276        } else {
277            self.address_store.set(address, new_count);
278        }
279    }
280
281    pub fn mark_connection_success(&mut self, address: NetAddress) {
282        if !self.address_store.has(address) {
283            return;
284        }
285
286        self.address_store.set(address, 0);
287    }
288
289    pub fn iterate_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
290        self.address_store.iterate_addresses()
291    }
292
293    pub fn iterate_prioritized_random_addresses(&self, exceptions: HashSet<NetAddress>) -> impl ExactSizeIterator<Item = NetAddress> {
294        self.address_store.iterate_prioritized_random_addresses(exceptions)
295    }
296
297    pub fn ban(&mut self, ip: IpAddress) {
298        self.banned_address_store.set(ip.into(), ConnectionBanTimestamp(unix_now())).unwrap();
299        self.address_store.remove_by_ip(ip.into());
300    }
301
302    pub fn unban(&mut self, ip: IpAddress) {
303        self.banned_address_store.remove(ip.into()).unwrap();
304    }
305
306    pub fn is_banned(&mut self, ip: IpAddress) -> bool {
307        const MAX_BANNED_TIME: u64 = 24 * 60 * 60 * 1000;
308        match self.banned_address_store.get(ip.into()).unwrap_option() {
309            Some(timestamp) => {
310                if unix_now() - timestamp.0 > MAX_BANNED_TIME {
311                    self.unban(ip);
312                    false
313                } else {
314                    true
315                }
316            }
317            None => false,
318        }
319    }
320
321    pub fn get_all_addresses(&self) -> Vec<NetAddress> {
322        self.address_store.iterate_addresses().collect_vec()
323    }
324
325    pub fn get_all_banned_addresses(&self) -> Vec<IpAddress> {
326        self.banned_address_store.iterator().map(|x| IpAddress::from(x.unwrap().0)).collect_vec()
327    }
328}
329
330mod address_store_with_cache {
331    // Since we need operations such as iterating all addresses, count, etc, we keep an easy to use copy of the database addresses.
332    // We don't expect it to be expensive since we limit the number of saved addresses.
333    use std::{
334        collections::{HashMap, HashSet},
335        net::IpAddr,
336        sync::Arc,
337    };
338
339    use itertools::Itertools;
340    use kaspa_database::prelude::{CachePolicy, DB};
341    use kaspa_utils::networking::PrefixBucket;
342    use rand::{
343        distributions::{WeightedError, WeightedIndex},
344        prelude::Distribution,
345    };
346
347    use crate::{
348        stores::{
349            address_store::{AddressesStore, DbAddressesStore, Entry},
350            AddressKey,
351        },
352        NetAddress, MAX_ADDRESSES, MAX_CONNECTION_FAILED_COUNT,
353    };
354
355    pub struct Store {
356        db_store: DbAddressesStore,
357        addresses: HashMap<AddressKey, Entry>,
358    }
359
360    impl Store {
361        fn new(db: Arc<DB>) -> Self {
362            // We manage the cache ourselves on this level, so we disable the inner builtin cache
363            let db_store = DbAddressesStore::new(db, CachePolicy::Empty);
364            let mut addresses = HashMap::new();
365            for (key, entry) in db_store.iterator().map(|res| res.unwrap()) {
366                addresses.insert(key, entry);
367            }
368
369            Self { db_store, addresses }
370        }
371
372        pub fn has(&mut self, address: NetAddress) -> bool {
373            self.addresses.contains_key(&address.into())
374        }
375
376        pub fn set(&mut self, address: NetAddress, connection_failed_count: u64) {
377            let entry = match self.addresses.get(&address.into()) {
378                Some(entry) => Entry { connection_failed_count, address: entry.address },
379                None => Entry { connection_failed_count, address },
380            };
381            self.db_store.set(address.into(), entry).unwrap();
382            self.addresses.insert(address.into(), entry);
383            self.keep_limit();
384        }
385
386        fn keep_limit(&mut self) {
387            while self.addresses.len() > MAX_ADDRESSES {
388                let to_remove =
389                    self.addresses.iter().max_by(|a, b| (a.1).connection_failed_count.cmp(&(b.1).connection_failed_count)).unwrap();
390                self.remove_by_key(*to_remove.0);
391            }
392        }
393
394        pub fn get(&self, address: NetAddress) -> Entry {
395            *self.addresses.get(&address.into()).unwrap()
396        }
397
398        pub fn remove(&mut self, address: NetAddress) {
399            self.remove_by_key(address.into())
400        }
401
402        fn remove_by_key(&mut self, key: AddressKey) {
403            self.addresses.remove(&key);
404            self.db_store.remove(key).unwrap()
405        }
406
407        pub fn iterate_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
408            self.addresses.values().map(|entry| entry.address)
409        }
410
411        /// This iterator functions as the node's ip routing selection algo.
412        /// It first adjusts in respect to the number of connection failures of each ip address,
413        /// whereby each connection failure (up to [`MAX_CONNECTION_FAILED_COUNT`]) reduces an ip's selection weight by a factor of 64,
414        /// Afterwards the weights are normalized uniformly over the ip's [`PrefixBucket`] size.
415        ///
416        /// This ensures a distributed selection across the global network, while respecting
417        /// weight reductions due to ip connection failures.
418        ///
419        /// The exact weight formula for any given ip, is as follows:
420        ///```ignore
421        ///         ip_weight = (64 ^ (x - y)) / n
422        ///
423        ///             whereby:
424        ///                 x: max allowed connection failures.
425        ///                 y: connection failures of the ip.
426        ///                 n: number of ips with the same prefix bytes.
427        ///```
428        pub fn iterate_prioritized_random_addresses(
429            &self,
430            exceptions: HashSet<NetAddress>,
431        ) -> impl ExactSizeIterator<Item = NetAddress> {
432            let exceptions: HashSet<AddressKey> = exceptions.into_iter().map(|addr| addr.into()).collect();
433            let mut prefix_counter: HashMap<PrefixBucket, usize> = HashMap::new();
434            let (mut weights, filtered_addresses): (Vec<f64>, Vec<NetAddress>) = self
435                .addresses
436                .iter()
437                .filter(|(addr_key, _)| !exceptions.contains(addr_key))
438                .map(|(_, e)| {
439                    let count = prefix_counter.entry(e.address.prefix_bucket()).or_insert(0);
440                    *count += 1;
441                    (64f64.powf((MAX_CONNECTION_FAILED_COUNT + 1 - e.connection_failed_count) as f64), e.address)
442                })
443                .unzip();
444
445            // Divide weights by size of bucket of the prefix bytes, to partially uniform the distribution over prefix buckets.
446            for (i, address) in filtered_addresses.iter().enumerate() {
447                *weights.get_mut(i).unwrap() /= *prefix_counter.get(&address.prefix_bucket()).unwrap() as f64;
448            }
449
450            RandomWeightedIterator::new(weights, filtered_addresses)
451        }
452
453        pub fn remove_by_ip(&mut self, ip: IpAddr) {
454            for key in self.addresses.keys().filter(|key| key.is_ip(ip)).copied().collect_vec() {
455                self.remove_by_key(key);
456            }
457        }
458    }
459
460    pub fn new(db: Arc<DB>) -> Store {
461        Store::new(db)
462    }
463
464    pub struct RandomWeightedIterator {
465        weighted_index: Option<WeightedIndex<f64>>,
466        remaining: usize,
467        addresses: Vec<NetAddress>,
468    }
469
470    impl RandomWeightedIterator {
471        pub fn new(weights: Vec<f64>, addresses: Vec<NetAddress>) -> Self {
472            assert_eq!(weights.len(), addresses.len());
473            let remaining = weights.iter().filter(|&&w| w > 0.0).count();
474            let weighted_index = match WeightedIndex::new(weights) {
475                Ok(index) => Some(index),
476                Err(WeightedError::NoItem) => None,
477                Err(e) => panic!("{e}"),
478            };
479            Self { weighted_index, remaining, addresses }
480        }
481    }
482
483    impl Iterator for RandomWeightedIterator {
484        type Item = NetAddress;
485
486        fn next(&mut self) -> Option<Self::Item> {
487            if let Some(weighted_index) = self.weighted_index.as_mut() {
488                let i = weighted_index.sample(&mut rand::thread_rng());
489                // Zero the selected address entry
490                match weighted_index.update_weights(&[(i, &0f64)]) {
491                    Ok(_) => {}
492                    Err(WeightedError::AllWeightsZero) => self.weighted_index = None,
493                    Err(e) => panic!("{e}"),
494                }
495                self.remaining -= 1;
496                if self.remaining == 0 {
497                    self.weighted_index = None;
498                }
499                Some(self.addresses[i])
500            } else {
501                None
502            }
503        }
504
505        fn size_hint(&self) -> (usize, Option<usize>) {
506            (self.remaining, Some(self.remaining))
507        }
508    }
509
510    impl ExactSizeIterator for RandomWeightedIterator {}
511
512    #[cfg(test)]
513    mod tests {
514        use std::str::FromStr;
515
516        use super::*;
517        use address_manager::AddressManager;
518        use kaspa_consensus_core::config::{params::SIMNET_PARAMS, Config};
519        use kaspa_core::task::tick::TickService;
520        use kaspa_database::create_temp_db;
521        use kaspa_database::prelude::ConnBuilder;
522        use kaspa_utils::networking::IpAddress;
523        use statest::ks::KSTest;
524        use statrs::distribution::Uniform;
525        use std::net::{IpAddr, Ipv6Addr};
526
527        #[test]
528        fn test_weighted_iterator() {
529            let address = NetAddress::new(IpAddr::V6(Ipv6Addr::LOCALHOST).into(), 1);
530            let iter = RandomWeightedIterator::new(vec![0.2, 0.3, 0.0], vec![address, address, address]);
531            assert_eq!(iter.len(), 2);
532            assert_eq!(iter.count(), 2);
533
534            let iter = RandomWeightedIterator::new(vec![], vec![]);
535            assert_eq!(iter.len(), 0);
536            assert_eq!(iter.count(), 0);
537        }
538
539        #[test]
540        fn test_network_distribution_weighting() {
541            kaspa_core::log::try_init_logger("info");
542
543            // Variables to initialize ip generation with.
544            let largest_bucket: u16 = 2048;
545            let bucket_reduction_ratio: f64 = 2.;
546
547            // Assert that initial distribution is skewed, and hence not uniform from the outset.
548            assert!(bucket_reduction_ratio >= 1.25);
549
550            let db = create_temp_db!(ConnBuilder::default().with_files_limit(10));
551            let config = Config::new(SIMNET_PARAMS);
552            let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));
553
554            let mut am_guard = am.lock();
555
556            let mut num_of_buckets = 0;
557            let mut num_of_addresses = 0;
558            let mut current_bucket_size = largest_bucket;
559
560            for current_prefix_bytes in 0..u16::MAX {
561                num_of_buckets += 1;
562                for current_suffix_bytes in 0..current_bucket_size {
563                    let current_ip_bytes =
564                        [current_prefix_bytes.to_be_bytes(), current_suffix_bytes.to_be_bytes()].concat().to_owned();
565                    am_guard.add_address(NetAddress::new(
566                        IpAddress::from_str(&format!(
567                            "{0}.{1}.{2}.{3}",
568                            current_ip_bytes[0], current_ip_bytes[1], current_ip_bytes[2], current_ip_bytes[3]
569                        ))
570                        .unwrap(),
571                        16111,
572                    ));
573                    num_of_addresses += 1;
574                }
575
576                let last_bucket_size = current_bucket_size;
577                current_bucket_size = ((current_bucket_size as f64) * (1.0 / bucket_reduction_ratio)).round() as u16;
578
579                if current_bucket_size == last_bucket_size || current_bucket_size == 0 || current_prefix_bytes == u16::MAX {
580                    // Address generation exhausted - exit loop
581                    break;
582                }
583            }
584            drop(am_guard);
585
586            // Assert sample size is large enough.
587            assert!(1024 <= num_of_addresses);
588            // Assert we don't over-generate the address manager's limit.
589            assert!(num_of_addresses <= MAX_ADDRESSES);
590            // Assert that the test has enough buckets to sample from
591            assert!(num_of_buckets >= 12);
592
593            // Run multiple Kolmogorov–Smirnov tests to offset random noise of the random weighted iterator
594            let num_of_trials = 512;
595            let mut cul_p = 0.;
596            // The target uniform distribution
597            let target_u_dist = Uniform::new(0.0, (num_of_buckets) as f64).unwrap();
598            for _ in 0..num_of_trials {
599                // The weight sampled expected uniform distibution
600                let prioritized_address_distribution = am
601                    .lock()
602                    .iterate_prioritized_random_addresses(HashSet::new())
603                    .take(num_of_buckets)
604                    .map(|addr| addr.prefix_bucket().as_u64() as f64)
605                    .collect_vec();
606
607                let ks_test = KSTest::new(prioritized_address_distribution.as_slice());
608                cul_p += ks_test.ks1(&target_u_dist).0;
609            }
610
611            // Normalize and adjust p to test for uniformity, over average of all trials.
612            let adjusted_p = (0.5 - cul_p / num_of_trials as f64).abs();
613            // Define the significance threshold.
614            let significance = 0.10;
615
616            // Display and assert the result
617            kaspa_core::info!(
618                "Kolmogorov–Smirnov test result for weighted network distribution uniformity: p = {0:.4} (p < {1})",
619                adjusted_p,
620                significance
621            );
622            assert!(adjusted_p <= significance)
623        }
624    }
625}