1mod port_mapping_extender;
2mod stores;
3extern crate self as address_manager;
4
5use std::{collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};
6
7use address_manager::port_mapping_extender::Extender;
8use igd_next::{
9 self as igd, aio::tokio::Tokio, AddAnyPortError, AddPortError, Gateway, GetExternalIpError, GetGenericPortMappingEntryError,
10 SearchError,
11};
12use itertools::{
13 Either::{Left, Right},
14 Itertools,
15};
16use kaspa_consensus_core::config::Config;
17use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
18use kaspa_database::prelude::{CachePolicy, StoreResultExtensions, DB};
19use kaspa_utils::networking::IpAddress;
20use local_ip_address::list_afinet_netifas;
21use parking_lot::Mutex;
22use stores::banned_address_store::{BannedAddressesStore, BannedAddressesStoreReader, ConnectionBanTimestamp, DbBannedAddressesStore};
23use thiserror::Error;
24
25pub use stores::NetAddress;
26
27const MAX_ADDRESSES: usize = 4096;
28const MAX_CONNECTION_FAILED_COUNT: u64 = 3;
29
30const UPNP_DEADLINE_SEC: u64 = 2 * 60;
31const UPNP_EXTEND_PERIOD: u64 = UPNP_DEADLINE_SEC / 2;
32
33pub(crate) const UPNP_REGISTRATION_NAME: &str = "rusty-kaspa";
35
36struct ExtendHelper {
37 gateway: Gateway,
38 local_addr: SocketAddr,
39 external_port: u16,
40}
41
42#[derive(Error, Debug)]
43pub enum UpnpError {
44 #[error(transparent)]
45 AddPortError(#[from] AddPortError),
46 #[error(transparent)]
47 AddAnyPortError(#[from] AddAnyPortError),
48 #[error(transparent)]
49 SearchError(#[from] SearchError),
50 #[error(transparent)]
51 GetExternalIpError(#[from] GetExternalIpError),
52}
53
54pub struct AddressManager {
55 banned_address_store: DbBannedAddressesStore,
56 address_store: address_store_with_cache::Store,
57 config: Arc<Config>,
58 local_net_addresses: Vec<NetAddress>,
59}
60
61impl AddressManager {
62 pub fn new(config: Arc<Config>, db: Arc<DB>, tick_service: Arc<TickService>) -> (Arc<Mutex<Self>>, Option<Extender>) {
63 let mut instance = Self {
64 banned_address_store: DbBannedAddressesStore::new(db.clone(), CachePolicy::Count(MAX_ADDRESSES)),
65 address_store: address_store_with_cache::new(db),
66 local_net_addresses: Vec::new(),
67 config,
68 };
69
70 let extender = instance.init_local_addresses(tick_service);
71
72 (Arc::new(Mutex::new(instance)), extender)
73 }
74
75 fn init_local_addresses(&mut self, tick_service: Arc<TickService>) -> Option<Extender> {
76 self.local_net_addresses = self.local_addresses().collect();
77
78 let extender = if self.local_net_addresses.is_empty() && !self.config.disable_upnp {
79 let (net_address, ExtendHelper { gateway, local_addr, external_port }) = match self.upnp() {
80 Err(err) => {
81 warn!("[UPnP] Error adding port mapping: {err}");
82 return None;
83 }
84 Ok(None) => return None,
85 Ok(Some((net_address, extend_helper))) => (net_address, extend_helper),
86 };
87 self.local_net_addresses.push(net_address);
88
89 let gateway: igd_next::aio::Gateway<Tokio> = igd_next::aio::Gateway {
90 addr: gateway.addr,
91 root_url: gateway.root_url,
92 control_url: gateway.control_url,
93 control_schema_url: gateway.control_schema_url,
94 control_schema: gateway.control_schema,
95 provider: Tokio,
96 };
97 Some(Extender::new(
98 tick_service,
99 Duration::from_secs(UPNP_EXTEND_PERIOD),
100 UPNP_DEADLINE_SEC,
101 gateway,
102 external_port,
103 local_addr,
104 ))
105 } else {
106 None
107 };
108
109 self.local_net_addresses.iter().for_each(|net_addr| {
110 info!("Publicly routable local address {} added to store", net_addr);
111 });
112 extender
113 }
114
115 fn local_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
116 match self.config.externalip {
117 Some(local_net_address) if local_net_address.ip.is_publicly_routable() => {
119 info!("External address is publicly routable {}", local_net_address);
120 return Left(iter::once(local_net_address));
121 }
122 Some(local_net_address) => {
123 info!("External address is not publicly routable {}", local_net_address);
124 }
125 None => {}
126 };
127
128 Right(self.routable_addresses_from_net_interfaces())
129 }
130
131 fn routable_addresses_from_net_interfaces(&self) -> impl Iterator<Item = NetAddress> + '_ {
132 let listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
135 if listen_address.ip.is_publicly_routable() {
136 info!("Publicly routable local address found: {}", listen_address.ip);
137 Left(Left(iter::once(listen_address)))
138 } else if listen_address.ip.is_unspecified() {
139 let network_interfaces = list_afinet_netifas();
140 let Ok(network_interfaces) = network_interfaces else {
141 warn!("Error getting network interfaces: {:?}", network_interfaces);
142 return Left(Right(iter::empty()));
143 };
144 Right(network_interfaces.into_iter().map(|(_, ip)| IpAddress::from(ip)).filter(|&ip| ip.is_publicly_routable()).map(
146 |ip| {
147 info!("Publicly routable local address found: {}", ip);
148 NetAddress::new(ip, self.config.default_p2p_port())
149 },
150 ))
151 } else {
152 Left(Right(iter::empty()))
153 }
154 }
155
156 fn upnp(&self) -> Result<Option<(NetAddress, ExtendHelper)>, UpnpError> {
157 info!("[UPnP] Attempting to register upnp... (to disable run the node with --disable-upnp)");
158 let gateway = igd::search_gateway(Default::default())?;
159 let ip = IpAddress::new(gateway.get_external_ip()?);
160 if !ip.is_publicly_routable() {
161 info!("[UPnP] Non-publicly routable external ip from gateway using upnp {} not added to store", ip);
162 return Ok(None);
163 }
164 info!("[UPnP] Got external ip from gateway using upnp: {ip}");
165
166 let normalized_p2p_listen_address = self.config.p2p_listen_address.normalize(self.config.default_p2p_port());
167 let local_addr = if normalized_p2p_listen_address.ip.is_unspecified() {
168 SocketAddr::new(local_ip_address::local_ip().unwrap(), normalized_p2p_listen_address.port)
169 } else {
170 normalized_p2p_listen_address.into()
171 };
172
173 let desired_external_port = normalized_p2p_listen_address.port;
175 let mut index = 0;
189 let already_in_use = loop {
190 match gateway.get_generic_port_mapping_entry(index) {
191 Ok(entry) => {
192 if entry.enabled && entry.external_port == desired_external_port {
193 info!("[UPnP] Found existing mapping that uses the same external port. Description: {}, external port: {}, internal port: {}, client: {}, lease duration: {}", entry.port_mapping_description, entry.external_port, entry.internal_port, entry.internal_client, entry.lease_duration);
194 break true;
195 }
196 index += 1;
197 }
198 Err(GetGenericPortMappingEntryError::ActionNotAuthorized) => {
199 index += 1;
200 continue;
201 }
202 Err(GetGenericPortMappingEntryError::RequestError(err)) => {
203 warn!("[UPnP] request existing port mapping err: {:?}", err);
204 break false;
205 }
206 Err(GetGenericPortMappingEntryError::SpecifiedArrayIndexInvalid) => break false,
207 }
208 };
209 if already_in_use {
210 let port =
211 gateway.add_any_port(igd::PortMappingProtocol::TCP, local_addr, UPNP_DEADLINE_SEC as u32, UPNP_REGISTRATION_NAME)?;
212 info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
213 return Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })));
214 }
215
216 match gateway.add_port(
217 igd::PortMappingProtocol::TCP,
218 desired_external_port,
219 local_addr,
220 UPNP_DEADLINE_SEC as u32,
221 UPNP_REGISTRATION_NAME,
222 ) {
223 Ok(_) => {
224 info!("[UPnP] Added port mapping to default external port: {ip}:{desired_external_port}");
225 Ok(Some((
226 NetAddress { ip, port: desired_external_port },
227 ExtendHelper { gateway, local_addr, external_port: desired_external_port },
228 )))
229 }
230 Err(AddPortError::PortInUse {}) => {
231 let port = gateway.add_any_port(
232 igd::PortMappingProtocol::TCP,
233 local_addr,
234 UPNP_DEADLINE_SEC as u32,
235 UPNP_REGISTRATION_NAME,
236 )?;
237 info!("[UPnP] Added port mapping to random external port: {ip}:{port}");
238 Ok(Some((NetAddress { ip, port }, ExtendHelper { gateway, local_addr, external_port: port })))
239 }
240 Err(err) => Err(err.into()),
241 }
242 }
243
244 pub fn best_local_address(&mut self) -> Option<NetAddress> {
245 if self.local_net_addresses.is_empty() {
246 None
247 } else {
248 Some(self.local_net_addresses[0])
251 }
252 }
253
254 pub fn add_address(&mut self, address: NetAddress) {
255 if address.ip.is_loopback() || address.ip.is_unspecified() {
256 debug!("[Address manager] skipping local address {}", address.ip);
257 return;
258 }
259
260 if self.address_store.has(address) {
261 return;
262 }
263
264 self.address_store.set(address, 1);
266 }
267
268 pub fn mark_connection_failure(&mut self, address: NetAddress) {
269 if !self.address_store.has(address) {
270 return;
271 }
272
273 let new_count = self.address_store.get(address).connection_failed_count + 1;
274 if new_count > MAX_CONNECTION_FAILED_COUNT {
275 self.address_store.remove(address);
276 } else {
277 self.address_store.set(address, new_count);
278 }
279 }
280
281 pub fn mark_connection_success(&mut self, address: NetAddress) {
282 if !self.address_store.has(address) {
283 return;
284 }
285
286 self.address_store.set(address, 0);
287 }
288
289 pub fn iterate_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
290 self.address_store.iterate_addresses()
291 }
292
293 pub fn iterate_prioritized_random_addresses(&self, exceptions: HashSet<NetAddress>) -> impl ExactSizeIterator<Item = NetAddress> {
294 self.address_store.iterate_prioritized_random_addresses(exceptions)
295 }
296
297 pub fn ban(&mut self, ip: IpAddress) {
298 self.banned_address_store.set(ip.into(), ConnectionBanTimestamp(unix_now())).unwrap();
299 self.address_store.remove_by_ip(ip.into());
300 }
301
302 pub fn unban(&mut self, ip: IpAddress) {
303 self.banned_address_store.remove(ip.into()).unwrap();
304 }
305
306 pub fn is_banned(&mut self, ip: IpAddress) -> bool {
307 const MAX_BANNED_TIME: u64 = 24 * 60 * 60 * 1000;
308 match self.banned_address_store.get(ip.into()).unwrap_option() {
309 Some(timestamp) => {
310 if unix_now() - timestamp.0 > MAX_BANNED_TIME {
311 self.unban(ip);
312 false
313 } else {
314 true
315 }
316 }
317 None => false,
318 }
319 }
320
321 pub fn get_all_addresses(&self) -> Vec<NetAddress> {
322 self.address_store.iterate_addresses().collect_vec()
323 }
324
325 pub fn get_all_banned_addresses(&self) -> Vec<IpAddress> {
326 self.banned_address_store.iterator().map(|x| IpAddress::from(x.unwrap().0)).collect_vec()
327 }
328}
329
330mod address_store_with_cache {
331 use std::{
334 collections::{HashMap, HashSet},
335 net::IpAddr,
336 sync::Arc,
337 };
338
339 use itertools::Itertools;
340 use kaspa_database::prelude::{CachePolicy, DB};
341 use kaspa_utils::networking::PrefixBucket;
342 use rand::{
343 distributions::{WeightedError, WeightedIndex},
344 prelude::Distribution,
345 };
346
347 use crate::{
348 stores::{
349 address_store::{AddressesStore, DbAddressesStore, Entry},
350 AddressKey,
351 },
352 NetAddress, MAX_ADDRESSES, MAX_CONNECTION_FAILED_COUNT,
353 };
354
355 pub struct Store {
356 db_store: DbAddressesStore,
357 addresses: HashMap<AddressKey, Entry>,
358 }
359
360 impl Store {
361 fn new(db: Arc<DB>) -> Self {
362 let db_store = DbAddressesStore::new(db, CachePolicy::Empty);
364 let mut addresses = HashMap::new();
365 for (key, entry) in db_store.iterator().map(|res| res.unwrap()) {
366 addresses.insert(key, entry);
367 }
368
369 Self { db_store, addresses }
370 }
371
372 pub fn has(&mut self, address: NetAddress) -> bool {
373 self.addresses.contains_key(&address.into())
374 }
375
376 pub fn set(&mut self, address: NetAddress, connection_failed_count: u64) {
377 let entry = match self.addresses.get(&address.into()) {
378 Some(entry) => Entry { connection_failed_count, address: entry.address },
379 None => Entry { connection_failed_count, address },
380 };
381 self.db_store.set(address.into(), entry).unwrap();
382 self.addresses.insert(address.into(), entry);
383 self.keep_limit();
384 }
385
386 fn keep_limit(&mut self) {
387 while self.addresses.len() > MAX_ADDRESSES {
388 let to_remove =
389 self.addresses.iter().max_by(|a, b| (a.1).connection_failed_count.cmp(&(b.1).connection_failed_count)).unwrap();
390 self.remove_by_key(*to_remove.0);
391 }
392 }
393
394 pub fn get(&self, address: NetAddress) -> Entry {
395 *self.addresses.get(&address.into()).unwrap()
396 }
397
398 pub fn remove(&mut self, address: NetAddress) {
399 self.remove_by_key(address.into())
400 }
401
402 fn remove_by_key(&mut self, key: AddressKey) {
403 self.addresses.remove(&key);
404 self.db_store.remove(key).unwrap()
405 }
406
407 pub fn iterate_addresses(&self) -> impl Iterator<Item = NetAddress> + '_ {
408 self.addresses.values().map(|entry| entry.address)
409 }
410
411 pub fn iterate_prioritized_random_addresses(
429 &self,
430 exceptions: HashSet<NetAddress>,
431 ) -> impl ExactSizeIterator<Item = NetAddress> {
432 let exceptions: HashSet<AddressKey> = exceptions.into_iter().map(|addr| addr.into()).collect();
433 let mut prefix_counter: HashMap<PrefixBucket, usize> = HashMap::new();
434 let (mut weights, filtered_addresses): (Vec<f64>, Vec<NetAddress>) = self
435 .addresses
436 .iter()
437 .filter(|(addr_key, _)| !exceptions.contains(addr_key))
438 .map(|(_, e)| {
439 let count = prefix_counter.entry(e.address.prefix_bucket()).or_insert(0);
440 *count += 1;
441 (64f64.powf((MAX_CONNECTION_FAILED_COUNT + 1 - e.connection_failed_count) as f64), e.address)
442 })
443 .unzip();
444
445 for (i, address) in filtered_addresses.iter().enumerate() {
447 *weights.get_mut(i).unwrap() /= *prefix_counter.get(&address.prefix_bucket()).unwrap() as f64;
448 }
449
450 RandomWeightedIterator::new(weights, filtered_addresses)
451 }
452
453 pub fn remove_by_ip(&mut self, ip: IpAddr) {
454 for key in self.addresses.keys().filter(|key| key.is_ip(ip)).copied().collect_vec() {
455 self.remove_by_key(key);
456 }
457 }
458 }
459
460 pub fn new(db: Arc<DB>) -> Store {
461 Store::new(db)
462 }
463
464 pub struct RandomWeightedIterator {
465 weighted_index: Option<WeightedIndex<f64>>,
466 remaining: usize,
467 addresses: Vec<NetAddress>,
468 }
469
470 impl RandomWeightedIterator {
471 pub fn new(weights: Vec<f64>, addresses: Vec<NetAddress>) -> Self {
472 assert_eq!(weights.len(), addresses.len());
473 let remaining = weights.iter().filter(|&&w| w > 0.0).count();
474 let weighted_index = match WeightedIndex::new(weights) {
475 Ok(index) => Some(index),
476 Err(WeightedError::NoItem) => None,
477 Err(e) => panic!("{e}"),
478 };
479 Self { weighted_index, remaining, addresses }
480 }
481 }
482
483 impl Iterator for RandomWeightedIterator {
484 type Item = NetAddress;
485
486 fn next(&mut self) -> Option<Self::Item> {
487 if let Some(weighted_index) = self.weighted_index.as_mut() {
488 let i = weighted_index.sample(&mut rand::thread_rng());
489 match weighted_index.update_weights(&[(i, &0f64)]) {
491 Ok(_) => {}
492 Err(WeightedError::AllWeightsZero) => self.weighted_index = None,
493 Err(e) => panic!("{e}"),
494 }
495 self.remaining -= 1;
496 if self.remaining == 0 {
497 self.weighted_index = None;
498 }
499 Some(self.addresses[i])
500 } else {
501 None
502 }
503 }
504
505 fn size_hint(&self) -> (usize, Option<usize>) {
506 (self.remaining, Some(self.remaining))
507 }
508 }
509
510 impl ExactSizeIterator for RandomWeightedIterator {}
511
512 #[cfg(test)]
513 mod tests {
514 use std::str::FromStr;
515
516 use super::*;
517 use address_manager::AddressManager;
518 use kaspa_consensus_core::config::{params::SIMNET_PARAMS, Config};
519 use kaspa_core::task::tick::TickService;
520 use kaspa_database::create_temp_db;
521 use kaspa_database::prelude::ConnBuilder;
522 use kaspa_utils::networking::IpAddress;
523 use statest::ks::KSTest;
524 use statrs::distribution::Uniform;
525 use std::net::{IpAddr, Ipv6Addr};
526
527 #[test]
528 fn test_weighted_iterator() {
529 let address = NetAddress::new(IpAddr::V6(Ipv6Addr::LOCALHOST).into(), 1);
530 let iter = RandomWeightedIterator::new(vec![0.2, 0.3, 0.0], vec![address, address, address]);
531 assert_eq!(iter.len(), 2);
532 assert_eq!(iter.count(), 2);
533
534 let iter = RandomWeightedIterator::new(vec![], vec![]);
535 assert_eq!(iter.len(), 0);
536 assert_eq!(iter.count(), 0);
537 }
538
539 #[test]
540 fn test_network_distribution_weighting() {
541 kaspa_core::log::try_init_logger("info");
542
543 let largest_bucket: u16 = 2048;
545 let bucket_reduction_ratio: f64 = 2.;
546
547 assert!(bucket_reduction_ratio >= 1.25);
549
550 let db = create_temp_db!(ConnBuilder::default().with_files_limit(10));
551 let config = Config::new(SIMNET_PARAMS);
552 let (am, _) = AddressManager::new(Arc::new(config), db.1, Arc::new(TickService::default()));
553
554 let mut am_guard = am.lock();
555
556 let mut num_of_buckets = 0;
557 let mut num_of_addresses = 0;
558 let mut current_bucket_size = largest_bucket;
559
560 for current_prefix_bytes in 0..u16::MAX {
561 num_of_buckets += 1;
562 for current_suffix_bytes in 0..current_bucket_size {
563 let current_ip_bytes =
564 [current_prefix_bytes.to_be_bytes(), current_suffix_bytes.to_be_bytes()].concat().to_owned();
565 am_guard.add_address(NetAddress::new(
566 IpAddress::from_str(&format!(
567 "{0}.{1}.{2}.{3}",
568 current_ip_bytes[0], current_ip_bytes[1], current_ip_bytes[2], current_ip_bytes[3]
569 ))
570 .unwrap(),
571 16111,
572 ));
573 num_of_addresses += 1;
574 }
575
576 let last_bucket_size = current_bucket_size;
577 current_bucket_size = ((current_bucket_size as f64) * (1.0 / bucket_reduction_ratio)).round() as u16;
578
579 if current_bucket_size == last_bucket_size || current_bucket_size == 0 || current_prefix_bytes == u16::MAX {
580 break;
582 }
583 }
584 drop(am_guard);
585
586 assert!(1024 <= num_of_addresses);
588 assert!(num_of_addresses <= MAX_ADDRESSES);
590 assert!(num_of_buckets >= 12);
592
593 let num_of_trials = 512;
595 let mut cul_p = 0.;
596 let target_u_dist = Uniform::new(0.0, (num_of_buckets) as f64).unwrap();
598 for _ in 0..num_of_trials {
599 let prioritized_address_distribution = am
601 .lock()
602 .iterate_prioritized_random_addresses(HashSet::new())
603 .take(num_of_buckets)
604 .map(|addr| addr.prefix_bucket().as_u64() as f64)
605 .collect_vec();
606
607 let ks_test = KSTest::new(prioritized_address_distribution.as_slice());
608 cul_p += ks_test.ks1(&target_u_dist).0;
609 }
610
611 let adjusted_p = (0.5 - cul_p / num_of_trials as f64).abs();
613 let significance = 0.10;
615
616 kaspa_core::info!(
618 "Kolmogorov–Smirnov test result for weighted network distribution uniformity: p = {0:.4} (p < {1})",
619 adjusted_p,
620 significance
621 );
622 assert!(adjusted_p <= significance)
623 }
624 }
625}