use super::*;
impl_veilid_log_facility!("net");
pub const ADDRESS_INCONSISTENCY_DETECTION_COUNT: usize = 5;
pub const ADDRESS_CONSISTENCY_DETECTION_COUNT: usize = 5;
pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10;
#[derive(Debug)]
pub struct AddressCheckConfig {
pub detect_address_changes: bool,
pub ip6_prefix_size: usize,
pub require_inbound_relay: bool,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct AddressCheckCacheKey(RoutingDomain, ProtocolType, AddressType);
pub struct AddressCheck {
registry: VeilidComponentRegistry,
config: AddressCheckConfig,
net: Network,
published_peer_info: BTreeMap<RoutingDomain, Arc<PeerInfo>>,
current_addresses: BTreeMap<AddressCheckCacheKey, HashSet<SocketAddress>>,
address_inconsistency_table: BTreeMap<AddressCheckCacheKey, usize>,
address_consistency_table:
BTreeMap<AddressCheckCacheKey, hashlink::LruCache<IpAddr, SocketAddress>>,
}
impl fmt::Debug for AddressCheck {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AddressCheck")
.field("config", &self.config)
.field("current_peer_info", &self.published_peer_info)
.field("current_addresses", &self.current_addresses)
.field(
"address_inconsistency_table",
&self.address_inconsistency_table,
)
.field("address_consistency_table", &self.address_consistency_table)
.finish()
}
}
impl_veilid_component_accessors!(AddressCheck);
impl AddressCheck {
pub fn new(net: Network) -> Self {
let registry = net.registry();
let config = registry.config();
let detect_address_changes = net.resolved_detect_address_changes();
let ip6_prefix_size = config.network.max_connections_per_ip6_prefix_size as usize;
let require_inbound_relay = config.network.privacy.require_inbound_relay;
let config = AddressCheckConfig {
detect_address_changes,
ip6_prefix_size,
require_inbound_relay,
};
Self {
registry,
config,
net,
published_peer_info: BTreeMap::new(),
current_addresses: BTreeMap::new(),
address_inconsistency_table: BTreeMap::new(),
address_consistency_table: BTreeMap::new(),
}
}
pub fn report_peer_info_change(
&mut self,
routing_domain: RoutingDomain,
opt_peer_info: Option<Arc<PeerInfo>>,
) {
for protocol_type in ProtocolTypeSet::all() {
for address_type in AddressTypeSet::all() {
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
self.current_addresses.remove(&acck);
self.address_inconsistency_table.remove(&acck);
self.address_consistency_table.remove(&acck);
}
}
if let Some(peer_info) = opt_peer_info {
self.published_peer_info
.insert(routing_domain, peer_info.clone());
for did in peer_info.node_info().dial_info_detail_list() {
let socket_address =
if did.class == DialInfoClass::Direct || did.class == DialInfoClass::Mapped {
did.dial_info.socket_address().with_port(0)
} else {
did.dial_info.socket_address()
};
let address_type = did.dial_info.address_type();
let protocol_type = did.dial_info.protocol_type();
let acck = AddressCheckCacheKey(routing_domain, protocol_type, address_type);
self.current_addresses
.entry(acck)
.or_default()
.insert(socket_address);
}
} else {
self.published_peer_info.remove(&routing_domain);
}
}
pub fn report_socket_address_change(
&mut self,
routing_domain: RoutingDomain, socket_address: SocketAddress, old_socket_address: Option<SocketAddress>, flow: Flow, reporting_peer: NodeRef, ) {
if !matches!(routing_domain, RoutingDomain::PublicInternet) {
return;
}
let Some(peer_info) = self.published_peer_info.get(&routing_domain).cloned() else {
return;
};
let Some(pla) = self
.net
.get_preferred_local_address_by_key(flow.protocol_type(), flow.address_type())
else {
return;
};
let Some(local) = flow.local() else {
return;
};
if local.port() != pla.port() {
veilid_log!(self debug target:"network_result", "ignoring address report because local port did not match listener: {} != {}", local.port(), pla.port());
return;
}
let reporting_ipblock =
ip_to_ipblock(self.config.ip6_prefix_size, flow.remote_address().ip_addr());
let Some(reporting_node_info) = reporting_peer.node_info(routing_domain) else {
return;
};
if !reporting_node_info.has_dial_info() {
return;
}
if reporting_ipblock == ip_to_ipblock(self.config.ip6_prefix_size, socket_address.ip_addr())
{
return;
}
if self.config.require_inbound_relay {
return;
}
let needs_address_detection = if peer_info.node_info().has_dial_info() {
self.detect_for_inbound_capable(
routing_domain,
socket_address,
old_socket_address,
flow,
reporting_peer,
)
} else {
self.detect_for_outbound_only(routing_domain, socket_address, flow, reporting_ipblock)
};
if needs_address_detection {
if self.config.detect_address_changes && !self.config.require_inbound_relay {
veilid_log!(self info
"{:?} address has changed, detecting dial info",
routing_domain
);
self.net.trigger_update_dial_info(routing_domain);
} else {
veilid_log!(self warn
"{:?} address may have changed. Restarting the server may be required.",
routing_domain
);
}
}
}
fn matches_current_address(
&self,
acckey: AddressCheckCacheKey,
socket_address: SocketAddress,
) -> bool {
self.current_addresses
.get(&acckey)
.map(|current_addresses| {
current_addresses.contains(&socket_address)
|| current_addresses.contains(&socket_address.with_port(0))
})
.unwrap_or(false)
}
fn detect_for_inbound_capable(
&mut self,
routing_domain: RoutingDomain, socket_address: SocketAddress, old_socket_address: Option<SocketAddress>, flow: Flow, reporting_peer: NodeRef, ) -> bool {
let acckey =
AddressCheckCacheKey(routing_domain, flow.protocol_type(), flow.address_type());
let new_matches_current = self.matches_current_address(acckey, socket_address);
if new_matches_current {
let registry = self.registry();
self.address_inconsistency_table
.entry(acckey)
.and_modify(|ait| {
if *ait != 0 {
veilid_log!(registry debug "Resetting address inconsistency for {:?} due to match on flow {:?} from {}", acckey, flow, reporting_peer);
}
*ait = 0;
})
.or_insert(0);
return false;
}
let old_matches_current = old_socket_address
.map(|osa| self.matches_current_address(acckey, osa))
.unwrap_or(false);
if old_matches_current {
let val = *self
.address_inconsistency_table
.entry(acckey)
.and_modify(|ait| {
*ait += 1;
})
.or_insert(1);
veilid_log!(self debug "Adding address inconsistency ({}) for {:?} due to address {} on flow {:?} from {}", val, acckey, socket_address, flow, reporting_peer);
return val >= ADDRESS_INCONSISTENCY_DETECTION_COUNT;
}
false
}
fn detect_for_outbound_only(
&mut self,
routing_domain: RoutingDomain, socket_address: SocketAddress, flow: Flow, reporting_ipblock: IpAddr, ) -> bool {
if !self.config.detect_address_changes {
return false;
}
let registry = self.registry();
let acckey =
AddressCheckCacheKey(routing_domain, flow.protocol_type(), flow.address_type());
let cache = self
.address_consistency_table
.entry(acckey)
.and_modify(|act| {
act.insert(reporting_ipblock, socket_address);
})
.or_insert_with(|| {
let mut lruc = hashlink::LruCache::new(ADDRESS_CHECK_CACHE_SIZE);
lruc.insert(reporting_ipblock, socket_address);
lruc
});
let mut consistencies = HashMap::<SocketAddress, usize>::new();
for (_k, v) in cache.iter() {
let count = *consistencies.entry(*v).and_modify(|e| *e += 1).or_insert(1);
if count >= ADDRESS_CONSISTENCY_DETECTION_COUNT {
veilid_log!(registry debug "Address consistency detected for {:?}: {}", acckey, v);
return true;
}
}
false
}
}