use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
impl_veilid_log_facility!("rtab");
pub(crate) const RELIABLE_PING_INTERVAL_START_SECS: u32 = 10;
pub(crate) const RELIABLE_PING_INTERVAL_MAX_SECS: u32 = 10 * 60;
pub(crate) const RELIABLE_PING_INTERVAL_MULTIPLIER: f64 = 2.0;
pub(crate) const UNRELIABLE_PING_SPAN_SECS: u32 = 60;
pub(crate) const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
pub(crate) const UNRELIABLE_LOST_ANSWERS_UNORDERED: u32 = 2;
pub(crate) const UNRELIABLE_LOST_ANSWERS_ORDERED: u32 = 0;
pub(crate) const NEVER_SEEN_PING_COUNT: u32 = 3;
pub(crate) const CONNECTIONLESS_TIMEOUT: TimestampDuration = TimestampDuration::new_secs(15);
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryDeadReason {
CanNotSend,
TooManyLostAnswers,
NoPingResponse,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryUnreliableReason {
FailedToSend,
LostAnswers,
NotSeenConsecutively,
InUnreliablePingSpan,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum BucketEntryStateReason {
Punished(PunishmentReason),
Dead(BucketEntryDeadReason),
Unreliable(BucketEntryUnreliableReason),
Reliable,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum BucketEntryState {
Punished,
Dead,
Unreliable,
Reliable,
}
impl BucketEntryState {
pub fn is_alive(&self) -> bool {
match self {
BucketEntryState::Punished => false,
BucketEntryState::Dead => false,
BucketEntryState::Unreliable => true,
BucketEntryState::Reliable => true,
}
}
pub fn ordering(&self) -> usize {
match self {
BucketEntryState::Punished => 0,
BucketEntryState::Dead => 1,
BucketEntryState::Unreliable => 2,
BucketEntryState::Reliable => 3,
}
}
}
impl From<BucketEntryStateReason> for BucketEntryState {
fn from(value: BucketEntryStateReason) -> Self {
match value {
BucketEntryStateReason::Punished(_) => BucketEntryState::Punished,
BucketEntryStateReason::Dead(_) => BucketEntryState::Dead,
BucketEntryStateReason::Unreliable(_) => BucketEntryState::Unreliable,
BucketEntryStateReason::Reliable => BucketEntryState::Reliable,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub(crate) struct LastFlowKey(pub ProtocolType, pub AddressType);
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub(crate) struct LastSenderInfoKey(pub RoutingDomain, pub ProtocolType, pub AddressType);
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BucketEntryPublicInternet {
peer_info: Option<Arc<PeerInfo>>,
last_seen_our_node_info_ts: Timestamp,
node_status: Option<NodeStatus>,
}
impl fmt::Display for BucketEntryPublicInternet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(pi) = &self.peer_info {
writeln!(f, "peer_info:")?;
write!(f, " {}", indent_string(pi))?;
} else {
writeln!(f, "peer_info: None")?;
}
writeln!(
f,
"last_seen_our_node_info_ts: {}",
self.last_seen_our_node_info_ts
)?;
writeln!(f, "node_status: {:?}", self.node_status)?;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BucketEntryLocalNetwork {
peer_info: Option<Arc<PeerInfo>>,
last_seen_our_node_info_ts: Timestamp,
node_status: Option<NodeStatus>,
}
impl fmt::Display for BucketEntryLocalNetwork {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(pi) = &self.peer_info {
writeln!(f, "peer_info:")?;
write!(f, " {}", indent_string(pi))?;
} else {
writeln!(f, "peer_info: None")?;
}
writeln!(
f,
"last_seen_our_node_info_ts: {}",
self.last_seen_our_node_info_ts
)?;
writeln!(f, "node_status: {:?}", self.node_status)?;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct BucketEntryInner {
node_ids: NodeIdGroup,
envelope_support: Vec<EnvelopeVersion>,
updated_since_last_network_change: bool,
#[serde(skip)]
last_flows: BTreeMap<LastFlowKey, (Flow, Timestamp)>,
#[serde(skip)]
last_sender_info: HashMap<LastSenderInfoKey, SenderInfo>,
public_internet: BucketEntryPublicInternet,
local_network: BucketEntryLocalNetwork,
peer_stats: PeerStats,
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
#[serde(skip)]
state_stats_accounting: Mutex<StateStatsAccounting>,
#[serde(skip)]
answer_stats_accounting_unordered: AnswerStatsAccounting,
#[serde(skip)]
answer_stats_accounting_ordered: AnswerStatsAccounting,
#[serde(skip)]
punishment: Option<PunishmentReason>,
#[cfg(feature = "geolocation")]
#[serde(skip)]
geolocation_info: GeolocationInfo,
#[cfg(feature = "tracking")]
#[serde(skip)]
next_track_id: usize,
#[cfg(feature = "tracking")]
#[serde(skip)]
node_ref_tracks: HashMap<usize, backtrace::Backtrace>,
}
impl fmt::Display for BucketEntryInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "node_ids: {}", self.node_ids)?;
writeln!(f, "envelope_support: {:?}", self.envelope_support)?;
writeln!(
f,
"updated_since_last_network_change: {:?}",
self.updated_since_last_network_change
)?;
writeln!(f, "last_flows:")?;
for lf in &self.last_flows {
writeln!(
f,
" {:?}/{:?}: {} @ {}",
lf.0 .0, lf.0 .1, lf.1 .0, lf.1 .1
)?;
}
writeln!(f, "last_sender_info:")?;
for lsi in &self.last_sender_info {
writeln!(
f,
" {:?}/{:?}/{:?}: {}",
lsi.0 .0, lsi.0 .1, lsi.0 .2, lsi.1.socket_address
)?;
}
writeln!(f, "public_internet:")?;
write!(f, "{}", indent_all_string(&self.public_internet))?;
writeln!(f, "local_network:")?;
write!(f, "{}", indent_all_string(&self.local_network))?;
writeln!(f, "peer_stats:")?;
write!(f, "{}", indent_all_string(&self.peer_stats))?;
writeln!(
f,
"punishment: {}",
if let Some(punishment) = self.punishment {
format!("{:?}", punishment)
} else {
"None".to_owned()
}
)?;
Ok(())
}
}
impl BucketEntryInner {
#[cfg(feature = "tracking")]
pub fn track(&mut self) -> usize {
let track_id = self.next_track_id;
self.next_track_id += 1;
self.node_ref_tracks
.insert(track_id, backtrace::Backtrace::new_unresolved());
track_id
}
#[cfg(feature = "tracking")]
pub fn untrack(&mut self, track_id: usize) {
self.node_ref_tracks.remove(&track_id);
}
pub fn node_ids(&self) -> NodeIdGroup {
self.node_ids.clone()
}
pub fn public_keys(&self, routing_domain: RoutingDomain) -> PublicKeyGroup {
match routing_domain {
RoutingDomain::LocalNetwork => self
.local_network
.peer_info
.as_ref()
.map(|x| x.node_info().public_keys())
.unwrap_or_default(),
RoutingDomain::PublicInternet => self
.public_internet
.peer_info
.as_ref()
.map(|x| x.node_info().public_keys())
.unwrap_or_default(),
}
}
pub fn best_node_id(&self) -> Option<NodeId> {
self.node_ids.first().cloned()
}
pub fn best_public_key(&self, routing_domain: RoutingDomain) -> Option<PublicKey> {
match routing_domain {
RoutingDomain::LocalNetwork => self
.local_network
.peer_info
.as_ref()
.and_then(|x| x.node_info().public_keys().first().cloned()),
RoutingDomain::PublicInternet => self
.public_internet
.peer_info
.as_ref()
.and_then(|x| x.node_info().public_keys().first().cloned()),
}
}
pub fn add_node_id(&mut self, node_id: NodeId) -> EyreResult<Option<NodeId>> {
if let Some(old_node_id) = self.node_ids.get(node_id.kind()) {
if old_node_id == node_id {
return Ok(None);
}
self.node_ids.add(node_id);
return Ok(Some(old_node_id));
}
self.node_ids.add(node_id);
Ok(None)
}
pub fn remove_node_id(&mut self, crypto_kind: CryptoKind) -> Option<NodeId> {
self.node_ids.remove(crypto_kind)
}
pub fn has_all_capabilities(
&self,
routing_domain: RoutingDomain,
capabilities: &[VeilidCapability],
) -> bool {
let Some(ni) = self.node_info(routing_domain) else {
return false;
};
ni.has_all_capabilities(capabilities)
}
pub fn has_any_capabilities(
&self,
routing_domain: RoutingDomain,
capabilities: &[VeilidCapability],
) -> bool {
let Some(ni) = self.node_info(routing_domain) else {
return false;
};
ni.has_any_capabilities(capabilities)
}
pub fn update_peer_info(
&mut self,
routing_domain: RoutingDomain,
peer_info: Arc<PeerInfo>,
) -> bool {
let opt_current_pi = match routing_domain {
RoutingDomain::LocalNetwork => &mut self.local_network.peer_info,
RoutingDomain::PublicInternet => &mut self.public_internet.peer_info,
};
let mut node_info_changed = false;
if let Some(current_pi) = opt_current_pi {
if !current_pi.signatures().is_empty() {
if peer_info.node_info().timestamp() <= current_pi.node_info().timestamp() {
if !self.updated_since_last_network_change
&& peer_info.node_info().timestamp() == current_pi.node_info().timestamp()
{
self.updated_since_last_network_change = true;
self.make_not_dead(Timestamp::now());
}
return false;
}
if !peer_info.equivalent(current_pi) {
node_info_changed = true;
}
}
}
let envelope_support = peer_info.node_info().envelope_support().to_vec();
*opt_current_pi = Some(peer_info.clone());
self.set_envelope_support(envelope_support);
self.updated_since_last_network_change = true;
self.make_not_dead(Timestamp::now());
#[cfg(feature = "geolocation")]
{
self.geolocation_info = peer_info.node_info().get_geolocation_info(routing_domain);
}
if node_info_changed {
self.clear_last_flows_except_latest();
}
node_info_changed
}
#[cfg(feature = "geolocation")]
pub(super) fn update_geolocation_info(&mut self) {
if let Some(ref peerinfo) = self.public_internet.peer_info {
self.geolocation_info = peerinfo
.node_info()
.get_geolocation_info(RoutingDomain::PublicInternet);
}
}
pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> {
let opt_peer_info = match routing_domain {
RoutingDomain::LocalNetwork => &self.local_network.peer_info,
RoutingDomain::PublicInternet => &self.public_internet.peer_info,
};
opt_peer_info.as_ref().map(|s| s.node_info())
}
pub fn get_peer_info(&self, routing_domain: RoutingDomain) -> Option<Arc<PeerInfo>> {
let opt_current_pi = match routing_domain {
RoutingDomain::LocalNetwork => &self.local_network.peer_info,
RoutingDomain::PublicInternet => &self.public_internet.peer_info,
};
opt_current_pi.clone()
}
pub fn best_routing_domain(
&self,
routing_table: &RoutingTable,
routing_domain_set: RoutingDomainSet,
) -> Option<RoutingDomain> {
for routing_domain in routing_domain_set {
let opt_current_pi = match routing_domain {
RoutingDomain::LocalNetwork => &self.local_network.peer_info,
RoutingDomain::PublicInternet => &self.public_internet.peer_info,
};
if opt_current_pi.is_some() {
return Some(routing_domain);
}
}
let mut best_routing_domain: Option<RoutingDomain> = None;
let last_connections =
self.last_flows(routing_table, true, NodeRefFilter::from(routing_domain_set));
for lc in last_connections {
if let Some(rd) = routing_table.routing_domain_for_flow(lc.0) {
if let Some(brd) = best_routing_domain {
if rd < brd {
best_routing_domain = Some(rd);
}
} else {
best_routing_domain = Some(rd);
}
}
}
best_routing_domain
}
fn flow_to_key(&self, last_flow: Flow) -> LastFlowKey {
LastFlowKey(last_flow.protocol_type(), last_flow.address_type())
}
pub(super) fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) {
if self.punishment.is_some() {
return;
}
let key = self.flow_to_key(last_flow);
self.last_flows.insert(key, (last_flow, timestamp));
}
pub(super) fn remove_last_flow(&mut self, last_flow: Flow) {
let key = self.flow_to_key(last_flow);
self.last_flows.remove(&key);
}
pub(super) fn clear_last_flows(&mut self, dial_info_filter: DialInfoFilter) {
if dial_info_filter != DialInfoFilter::all() {
self.last_flows.retain(|k, _v| {
!(dial_info_filter.protocol_type_set.contains(k.0)
&& dial_info_filter.address_type_set.contains(k.1))
})
} else {
self.last_flows.clear();
}
}
pub(super) fn clear_last_flows_except_latest(&mut self) {
if self.last_flows.is_empty() {
return;
}
let mut dead_keys = Vec::with_capacity(self.last_flows.len() - 1);
let mut most_recent_flow = None;
let mut most_recent_flow_time = 0u64;
for (k, v) in &self.last_flows {
let lct = v.1.as_u64();
if lct > most_recent_flow_time {
most_recent_flow = Some(k);
most_recent_flow_time = lct;
}
}
let Some(most_recent_flow) = most_recent_flow else {
return;
};
for k in self.last_flows.keys() {
if k != most_recent_flow {
dead_keys.push(k.clone());
}
}
for dk in dead_keys {
self.last_flows.remove(&dk);
}
}
pub(super) fn last_flows(
&self,
routing_table: &RoutingTable,
only_live: bool,
filter: NodeRefFilter,
) -> Vec<(Flow, Timestamp)> {
let opt_connection_manager = routing_table.network_manager().opt_connection_manager();
let mut out: Vec<(Flow, Timestamp)> = self
.last_flows
.iter()
.filter_map(|(k, v)| {
let include = {
routing_table.routing_domain_for_flow(v.0).map(|rd| {
filter.routing_domain_set.contains(rd)
&& filter.dial_info_filter.protocol_type_set.contains(k.0)
&& filter.dial_info_filter.address_type_set.contains(k.1)
}).unwrap_or(false)
};
if !include {
return None;
}
if !only_live {
return Some(*v);
}
let alive =
if matches!(v.0.protocol_type().sequence_ordering(), SequenceOrdering::Ordered) {
if let Some(connection_manager) = &opt_connection_manager {
connection_manager.get_connection(v.0).is_some()
} else {
false
}
} else {
let cur_ts = Timestamp::now();
v.1.later(CONNECTIONLESS_TIMEOUT) >= cur_ts
};
if alive {
Some(*v)
} else {
None
}
})
.collect();
out.sort_by(|a, b| b.1.cmp(&a.1));
out
}
pub(super) fn add_envelope_version(&mut self, envelope_version: EnvelopeVersion) {
assert!(VALID_ENVELOPE_VERSIONS.contains(&envelope_version));
if self.envelope_support.contains(&envelope_version) {
return;
}
self.envelope_support.push(envelope_version);
self.envelope_support.sort_by(|a, b| {
let a_sort = VALID_ENVELOPE_VERSIONS
.iter()
.position(|x| x == a)
.unwrap_or_log();
let b_sort = VALID_ENVELOPE_VERSIONS
.iter()
.position(|x| x == b)
.unwrap_or_log();
a_sort.cmp(&b_sort)
});
}
pub(super) fn set_envelope_support(&mut self, mut envelope_support: Vec<EnvelopeVersion>) {
envelope_support.sort();
envelope_support.dedup();
self.envelope_support = envelope_support;
}
pub fn best_envelope_version(&self) -> Option<EnvelopeVersion> {
self.envelope_support
.iter()
.find(|x| VALID_ENVELOPE_VERSIONS.contains(x))
.copied()
}
pub fn state_reason(&self, cur_ts: Timestamp) -> BucketEntryStateReason {
let reason = if let Some(punished_reason) = self.punishment {
BucketEntryStateReason::Punished(punished_reason)
} else if let Some(dead_reason) = self.check_dead(cur_ts) {
BucketEntryStateReason::Dead(dead_reason)
} else if let Some(unreliable_reason) = self.check_unreliable(cur_ts) {
BucketEntryStateReason::Unreliable(unreliable_reason)
} else {
BucketEntryStateReason::Reliable
};
self.state_stats_accounting
.lock()
.record_state_reason(cur_ts, reason);
reason
}
pub fn state(&self, cur_ts: Timestamp) -> BucketEntryState {
self.state_reason(cur_ts).into()
}
pub(super) fn make_snapshot(
&self,
registry: VeilidComponentRegistry,
entry: Arc<BucketEntry>,
cur_ts: Timestamp,
) -> BucketEntrySnapshot {
let mut routing_domain_snapshots = BTreeMap::new();
if let Some(peer_info) = self.public_internet.peer_info.clone() {
routing_domain_snapshots.insert(
RoutingDomain::PublicInternet,
BucketEntryRoutingDomainSnapshotInner {
peer_info,
node_status: self.public_internet.node_status.clone(),
last_seen_our_node_info_ts: self.public_internet.last_seen_our_node_info_ts,
},
);
}
if let Some(peer_info) = self.local_network.peer_info.clone() {
routing_domain_snapshots.insert(
RoutingDomain::LocalNetwork,
BucketEntryRoutingDomainSnapshotInner {
peer_info,
node_status: self.local_network.node_status.clone(),
last_seen_our_node_info_ts: self.local_network.last_seen_our_node_info_ts,
},
);
}
BucketEntrySnapshot::new(
cur_ts,
NodeRef::new(registry, entry),
self.peer_stats.clone(),
self.state(cur_ts),
self.node_ids.clone(),
routing_domain_snapshots,
)
}
#[cfg(feature = "geolocation")]
pub fn geolocation_info(&self) -> &GeolocationInfo {
&self.geolocation_info
}
pub fn set_punished(&mut self, punished: Option<PunishmentReason>) {
self.punishment = punished;
if punished.is_some() {
self.clear_last_flows(DialInfoFilter::all());
}
}
pub fn peer_stats(&self) -> &PeerStats {
&self.peer_stats
}
pub fn update_node_status(&mut self, routing_domain: RoutingDomain, status: NodeStatus) {
match routing_domain {
RoutingDomain::LocalNetwork => {
self.local_network.node_status = Some(status);
}
RoutingDomain::PublicInternet => {
self.public_internet.node_status = Some(status);
}
}
}
pub fn set_seen_our_node_info_ts(
&mut self,
routing_domain: RoutingDomain,
seen_ts: Timestamp,
) -> Option<Timestamp> {
match routing_domain {
RoutingDomain::LocalNetwork => {
let old_ts = self.local_network.last_seen_our_node_info_ts;
if old_ts != seen_ts {
self.local_network.last_seen_our_node_info_ts = seen_ts;
Some(old_ts)
} else {
None
}
}
RoutingDomain::PublicInternet => {
let old_ts = self.public_internet.last_seen_our_node_info_ts;
if old_ts != seen_ts {
self.public_internet.last_seen_our_node_info_ts = seen_ts;
Some(old_ts)
} else {
None
}
}
}
}
pub fn has_seen_our_node_info_ts(
&self,
routing_domain: RoutingDomain,
our_node_info_ts: Timestamp,
) -> bool {
match routing_domain {
RoutingDomain::LocalNetwork => {
our_node_info_ts == self.local_network.last_seen_our_node_info_ts
}
RoutingDomain::PublicInternet => {
our_node_info_ts == self.public_internet.last_seen_our_node_info_ts
}
}
}
pub fn reset_updated_since_last_network_change(&mut self) {
self.updated_since_last_network_change = false;
}
pub(super) fn roll_transfers(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
self.transfer_stats_accounting.roll_transfers(
last_ts,
cur_ts,
&mut self.peer_stats.transfer,
);
}
fn record_latency(&mut self, latency: TimestampDuration) {
self.peer_stats.latency = Some(self.latency_stats_accounting.record_latency(latency));
}
pub(super) fn update_state_stats(&mut self) {
if let Some(state_stats) = self.state_stats_accounting.lock().take_stats() {
self.peer_stats.state = state_stats;
}
}
pub(super) fn roll_answer_stats(&mut self, cur_ts: Timestamp) {
self.peer_stats.rpc_stats.answer_unordered =
self.answer_stats_accounting_unordered.roll_answers(cur_ts);
self.peer_stats.rpc_stats.answer_ordered =
self.answer_stats_accounting_ordered.roll_answers(cur_ts);
}
pub(super) fn check_unreliable(
&self,
cur_ts: Timestamp,
) -> Option<BucketEntryUnreliableReason> {
if self.peer_stats.rpc_stats.failed_to_send > 0 {
return Some(BucketEntryUnreliableReason::FailedToSend);
}
if self.peer_stats.rpc_stats.recent_lost_answers_unordered
> UNRELIABLE_LOST_ANSWERS_UNORDERED
{
return Some(BucketEntryUnreliableReason::LostAnswers);
}
if self.peer_stats.rpc_stats.recent_lost_answers_ordered > UNRELIABLE_LOST_ANSWERS_ORDERED {
return Some(BucketEntryUnreliableReason::LostAnswers);
}
match self.peer_stats.rpc_stats.first_consecutive_seen_ts {
None => return Some(BucketEntryUnreliableReason::NotSeenConsecutively),
Some(ts) => {
let seen_consecutively = cur_ts.duration_since(ts)
>= TimestampDuration::new_secs(UNRELIABLE_PING_SPAN_SECS);
if !seen_consecutively {
return Some(BucketEntryUnreliableReason::InUnreliablePingSpan);
}
}
}
None
}
pub(super) fn check_dead(&self, cur_ts: Timestamp) -> Option<BucketEntryDeadReason> {
if self.peer_stats.rpc_stats.failed_to_send >= NEVER_SEEN_PING_COUNT {
return Some(BucketEntryDeadReason::CanNotSend);
}
match self.peer_stats.rpc_stats.last_seen_ts {
None => {
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers_unordered
+ self.peer_stats.rpc_stats.recent_lost_answers_ordered
>= NEVER_SEEN_PING_COUNT;
if no_answers {
return Some(BucketEntryDeadReason::TooManyLostAnswers);
}
}
Some(ts) => {
let not_seen = cur_ts.duration_since(ts)
>= TimestampDuration::new_secs(UNRELIABLE_PING_SPAN_SECS);
let no_answers = self.peer_stats.rpc_stats.recent_lost_answers_unordered
+ self.peer_stats.rpc_stats.recent_lost_answers_ordered
>= (UNRELIABLE_PING_SPAN_SECS / UNRELIABLE_PING_INTERVAL_SECS);
if not_seen && no_answers {
return Some(BucketEntryDeadReason::NoPingResponse);
}
}
}
None
}
pub(super) fn touch_last_seen(&mut self, ts: Timestamp) {
if self
.peer_stats
.rpc_stats
.first_consecutive_seen_ts
.is_none()
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = Some(ts);
}
self.peer_stats.rpc_stats.last_seen_ts = Some(ts);
}
pub(super) fn make_not_dead(&mut self, cur_ts: Timestamp) {
if self.check_dead(cur_ts).is_some() {
self.peer_stats.rpc_stats.last_seen_ts = None;
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
self.peer_stats.rpc_stats.failed_to_send = 0;
self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0;
self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0;
assert!(self.check_dead(cur_ts).is_none());
}
}
pub(super) fn _state_debug_info(&self, cur_ts: Timestamp) -> String {
let first_consecutive_seen_ts = if let Some(first_consecutive_seen_ts) =
self.peer_stats.rpc_stats.first_consecutive_seen_ts
{
format!("{} ago", cur_ts.duration_since(first_consecutive_seen_ts))
} else {
"never".to_owned()
};
let last_seen_ts_str = if let Some(last_seen_ts) = self.peer_stats.rpc_stats.last_seen_ts {
format!("{} ago", cur_ts.duration_since(last_seen_ts))
} else {
"never".to_owned()
};
format!(
"state: {:?}, first_consecutive_seen_ts: {}, last_seen_ts: {}",
self.state_reason(cur_ts),
first_consecutive_seen_ts,
last_seen_ts_str
)
}
pub(super) fn question_sent(
&mut self,
ts: Timestamp,
bytes: ByteCount,
expects_answer: bool,
ordering: SequenceOrdering,
) {
self.transfer_stats_accounting.add_up(bytes);
match ordering {
SequenceOrdering::Ordered => {
self.answer_stats_accounting_ordered.record_question(ts);
}
SequenceOrdering::Unordered => {
self.answer_stats_accounting_unordered.record_question(ts);
}
}
self.peer_stats.rpc_stats.messages_sent += 1;
self.peer_stats.rpc_stats.failed_to_send = 0;
if expects_answer {
self.peer_stats.rpc_stats.questions_in_flight += 1;
self.peer_stats.rpc_stats.last_question_ts = Some(ts);
}
}
pub(super) fn question_rcvd(&mut self, ts: Timestamp, bytes: ByteCount) {
self.transfer_stats_accounting.add_down(bytes);
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.touch_last_seen(ts);
}
pub(super) fn answer_sent(&mut self, bytes: ByteCount) {
self.transfer_stats_accounting.add_up(bytes);
self.peer_stats.rpc_stats.messages_sent += 1;
self.peer_stats.rpc_stats.failed_to_send = 0;
}
pub(super) fn answer_rcvd(
&mut self,
send_ts: Timestamp,
recv_ts: Timestamp,
bytes: ByteCount,
ordering: SequenceOrdering,
) {
self.transfer_stats_accounting.add_down(bytes);
match ordering {
SequenceOrdering::Ordered => {
self.answer_stats_accounting_ordered.record_answer(recv_ts);
self.peer_stats.rpc_stats.recent_lost_answers_ordered = 0;
}
SequenceOrdering::Unordered => {
self.answer_stats_accounting_unordered
.record_answer(recv_ts);
self.peer_stats.rpc_stats.recent_lost_answers_unordered = 0;
}
}
self.peer_stats.rpc_stats.messages_rcvd += 1;
self.peer_stats.rpc_stats.questions_in_flight -= 1;
self.record_latency(recv_ts.duration_since(send_ts));
self.touch_last_seen(recv_ts);
}
pub(super) fn lost_answer(&mut self, ordering: SequenceOrdering) {
let cur_ts = Timestamp::now();
match ordering {
SequenceOrdering::Ordered => {
self.answer_stats_accounting_ordered
.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_ordered += 1;
if self.peer_stats.rpc_stats.recent_lost_answers_ordered
> UNRELIABLE_LOST_ANSWERS_ORDERED
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
}
SequenceOrdering::Unordered => {
self.answer_stats_accounting_unordered
.record_lost_answer(cur_ts);
self.peer_stats.rpc_stats.recent_lost_answers_unordered += 1;
if self.peer_stats.rpc_stats.recent_lost_answers_unordered
> UNRELIABLE_LOST_ANSWERS_UNORDERED
{
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
}
}
self.peer_stats.rpc_stats.questions_in_flight -= 1;
}
pub(super) fn failed_to_send(&mut self, ts: Timestamp, expects_answer: bool) {
if expects_answer {
self.peer_stats.rpc_stats.last_question_ts = Some(ts);
}
self.peer_stats.rpc_stats.failed_to_send += 1;
self.peer_stats.rpc_stats.first_consecutive_seen_ts = None;
}
pub(super) fn report_sender_info(
&mut self,
key: LastSenderInfoKey,
sender_info: SenderInfo,
) -> Option<SenderInfo> {
let last_sender_info = self.last_sender_info.insert(key, sender_info);
if last_sender_info != Some(sender_info) {
last_sender_info
} else {
None
}
}
}
#[derive(Debug)]
pub(crate) struct BucketEntry {
pub(super) ref_count: AtomicU32,
inner: RwLock<BucketEntryInner>,
}
impl BucketEntry {
pub(super) fn new(first_node_id: NodeId) -> Self {
assert!(VALID_CRYPTO_KINDS.contains(&first_node_id.kind()));
let now = Timestamp::now();
let inner = BucketEntryInner {
node_ids: NodeIdGroup::from(first_node_id),
envelope_support: Vec::new(),
updated_since_last_network_change: false,
last_flows: BTreeMap::new(),
last_sender_info: HashMap::new(),
local_network: BucketEntryLocalNetwork {
last_seen_our_node_info_ts: Timestamp::new(0u64),
peer_info: None,
node_status: None,
},
public_internet: BucketEntryPublicInternet {
last_seen_our_node_info_ts: Timestamp::new(0u64),
peer_info: None,
node_status: None,
},
#[cfg(feature = "geolocation")]
geolocation_info: Default::default(),
peer_stats: PeerStats {
time_added: now,
rpc_stats: RPCStats::default(),
latency: None,
transfer: TransferStatsDownUp::default(),
state: StateStats::default(),
},
latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(),
state_stats_accounting: Mutex::new(StateStatsAccounting::new()),
answer_stats_accounting_ordered: AnswerStatsAccounting::new(),
answer_stats_accounting_unordered: AnswerStatsAccounting::new(),
punishment: None,
#[cfg(feature = "tracking")]
next_track_id: 0,
#[cfg(feature = "tracking")]
node_ref_tracks: HashMap::new(),
};
Self::new_with_inner(inner)
}
pub(super) fn new_with_inner(inner: BucketEntryInner) -> Self {
Self {
ref_count: AtomicU32::new(0),
inner: RwLock::new(inner),
}
}
pub fn snapshot(
self: &Arc<Self>,
registry: VeilidComponentRegistry,
cur_ts: Timestamp,
) -> BucketEntrySnapshot {
let inner = self.inner.read();
inner.make_snapshot(registry, self.clone(), cur_ts)
}
pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&BucketEntryInner) -> R,
{
let inner = self.inner.read();
f(&inner)
}
pub fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut BucketEntryInner) -> R,
{
let mut inner = self.inner.write();
f(&mut inner)
}
}
impl Drop for BucketEntry {
fn drop(&mut self) {
if self.ref_count.load(Ordering::Acquire) != 0 {
#[cfg(feature = "tracking")]
{
veilid_log!(self info "NodeRef Tracking");
for (id, bt) in &mut self.node_ref_tracks {
bt.resolve();
veilid_log!(self info "Id: {}\n----------------\n{:#?}", id, bt);
}
}
panic!(
"bucket entry dropped with non-zero refcount: {:#?}",
&*self.inner.read()
)
}
}
}