use super::*;
impl RoutingTable {
pub fn touch_recent_peer(&self, node_id: NodeId, last_connection: Flow) {
self.recent_peers
.lock()
.insert(node_id, RecentPeersEntry { last_connection });
}
#[cfg_attr(feature = "instrument", instrument(level = "trace", skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
pub fn get_recent_peers(&self) -> Vec<(NodeId, RecentPeersEntry)> {
let mut recent_peers_locked = self.recent_peers.lock();
let mut dead_peers = Vec::new();
let mut out = Vec::new();
for node_id in recent_peers_locked.iter().map(|(k, _v)| k.clone()) {
let mut dead = true;
if let Ok(Some(nr)) = self.lookup_node_id(node_id.clone()) {
if let Some(last_connection) = nr.last_flow() {
out.push((node_id.clone(), RecentPeersEntry { last_connection }));
dead = false;
}
}
if dead {
dead_peers.push(node_id);
}
}
for d in dead_peers {
recent_peers_locked.remove(&d);
}
out
}
pub fn get_routing_table_health(&self) -> Arc<RoutingTableHealth> {
self.routing_table_health.lock().clone()
}
#[cfg_attr(
feature = "instrument",
instrument(level = "trace", skip(self, filter, metric), ret, fields(__VEILID_LOG_KEY = self.log_key()))
)]
pub(in crate::routing_table) fn get_node_relative_performance(
&self,
node_id: NodeId,
snapshot: &EntrySnapshot,
filter: impl Fn(&BucketEntrySnapshot) -> bool,
metric: impl Fn(&LatencyStats) -> TimestampDuration,
) -> Option<NodeRelativePerformance> {
let mut all_filtered_nodes: Vec<&BucketEntrySnapshot> = snapshot
.entries()
.iter()
.filter(|snap| filter(snap))
.collect();
all_filtered_nodes.sort_by(|a, b| BucketEntrySnapshot::cmp_fastest_reliable(a, b, &metric));
let node_count = all_filtered_nodes.len();
let node_index = all_filtered_nodes
.iter()
.position(|snap| snap.node_ids.contains(&node_id))?;
#[cfg(feature = "verbose-tracing")]
for nl in 0..node_index {
let snap = &all_filtered_nodes[nl];
if let Some(node_id) = snap.best_node_id() {
if let Some(latency) = &snap.peer_stats.latency {
veilid_log!(self debug "Better relay {}: {}: {}", nl, node_id, latency);
}
}
}
Some(NodeRelativePerformance {
percentile: 100.0f32 - ((node_index * 100) as f32) / (node_count as f32),
node_index,
node_count,
})
}
pub(in crate::routing_table) fn refresh_summaries(
&self,
reset_low_water_mark_domains: RoutingDomainSet,
) {
let entry_snapshot = self.snapshot_entries(Timestamp::now(), BucketEntryState::Punished);
let mut routing_table_health = RoutingTableHealth::default();
for entry in entry_snapshot.entries().iter() {
routing_table_health.total_entry_count += 1;
match entry.state {
BucketEntryState::Reliable => {
routing_table_health.reliable_entry_count += 1;
}
BucketEntryState::Unreliable => {
routing_table_health.unreliable_entry_count += 1;
}
BucketEntryState::Dead => {
routing_table_health.dead_entry_count += 1;
}
BucketEntryState::Punished => {
routing_table_health.punished_entry_count += 1;
}
}
}
for routing_domain in RoutingDomainSet::all() {
let mut entry_summary = EntrySummary::new();
for entry in entry_snapshot.entries().iter() {
if let Some(pi) = entry.get_peer_info(routing_domain) {
if !pi.signatures().is_empty() {
entry_summary.add_entry(routing_domain, entry);
}
}
}
let mut low_water_mark = LowWaterMark::new();
for ck in VALID_CRYPTO_KINDS {
let mut count = CapabilityCounts::new();
for entry in entry_snapshot.entries().iter() {
if entry.state >= BucketEntryState::Unreliable
&& entry.routing_domain_set().contains(routing_domain)
&& entry.crypto_kinds().contains(&ck)
{
count.add_entry(routing_domain, entry);
}
}
low_water_mark.set(ck, count);
}
{
let rdc = self.get_routing_domain_controller(routing_domain);
{
let rdd = rdc.read();
rdd.set_entry_summary(Arc::new(entry_summary));
if reset_low_water_mark_domains.contains(routing_domain) {
rdd.reset_low_water_mark();
}
rdd.update_low_water_mark(Arc::new(low_water_mark));
}
let health = rdc.get_health();
routing_table_health
.routing_domain_health
.insert(routing_domain, health);
}
}
let routing_table_health = Arc::new(routing_table_health);
let current_routing_table_health = routing_table_health.clone();
let last_routing_table_health = {
let mut rth_lock = self.routing_table_health.lock();
core::mem::replace(&mut *rth_lock, routing_table_health)
};
let mut readiness_changed = false;
for routing_domain in RoutingDomainSet::all() {
let last_inbound_ready = last_routing_table_health
.routing_domain_health
.get(&routing_domain)
.map(|h| h.is_ready_inbound);
let last_outbound_ready = last_routing_table_health
.routing_domain_health
.get(&routing_domain)
.map(|h| h.is_ready_outbound);
let current_inbound_ready = current_routing_table_health
.routing_domain_health
.get(&routing_domain)
.map(|h| h.is_ready_inbound);
let current_outbound_ready = current_routing_table_health
.routing_domain_health
.get(&routing_domain)
.map(|h| h.is_ready_outbound);
if last_inbound_ready != current_inbound_ready
|| last_outbound_ready != current_outbound_ready
{
readiness_changed = true;
break;
}
}
if last_routing_table_health.live_entry_count()
!= current_routing_table_health.live_entry_count()
|| readiness_changed
{
veilid_log!(self debug "Routing Table Health:\n{}", indent_all_string(¤t_routing_table_health));
}
}
}