use std::{
collections::{BTreeSet, HashSet},
fmt::{self, Debug, Display, Formatter},
net::SocketAddr,
sync::atomic::Ordering,
time::{Duration, SystemTime},
};
use casper_types::{DisplayIter, EraId, PublicKey};
use serde::Serialize;
use crate::{
types::NodeId,
utils::{opt_display::OptDisplay, TimeAnchor},
};
use super::{
error::ConnectionError, outgoing::OutgoingState, symmetry::ConnectionSymmetry, Network,
OutgoingHandle, Payload,
};
#[derive(Debug, Serialize)]
pub(crate) struct NetworkInsights {
our_id: NodeId,
network_ca: bool,
public_addr: Option<SocketAddr>,
is_syncing: bool,
net_active_era: EraId,
privileged_active_outgoing_nodes: Option<HashSet<PublicKey>>,
privileged_upcoming_outgoing_nodes: Option<HashSet<PublicKey>>,
unspent_bandwidth_allowance_bytes: Option<i64>,
outgoing_connections: Vec<(SocketAddr, OutgoingInsight)>,
connection_symmetries: Vec<(NodeId, ConnectionSymmetryInsight)>,
}
#[derive(Debug, Serialize)]
struct OutgoingInsight {
unforgettable: bool,
state: OutgoingStateInsight,
}
#[derive(Debug, Serialize)]
enum OutgoingStateInsight {
Connecting {
failures_so_far: u8,
since: SystemTime,
},
Waiting {
failures_so_far: u8,
error: Option<String>,
last_failure: SystemTime,
},
Connected {
peer_id: NodeId,
peer_addr: SocketAddr,
last_ping_sent: Option<SystemTime>,
last_pong_received: Option<SystemTime>,
invalid_pong_count: u32,
rtt: Option<Duration>,
},
Blocked {
since: SystemTime,
justification: String,
until: SystemTime,
},
Loopback,
}
fn time_delta(now: SystemTime, then: SystemTime) -> impl Display {
OptDisplay::new(
now.duration_since(then)
.map(humantime::format_duration)
.ok(),
"err",
)
}
impl OutgoingStateInsight {
fn from_outgoing_state<P>(
anchor: &TimeAnchor,
state: &OutgoingState<OutgoingHandle<P>, ConnectionError>,
) -> Self {
match state {
OutgoingState::Connecting {
failures_so_far,
since,
} => OutgoingStateInsight::Connecting {
failures_so_far: *failures_so_far,
since: anchor.convert(*since),
},
OutgoingState::Waiting {
failures_so_far,
error,
last_failure,
} => OutgoingStateInsight::Waiting {
failures_so_far: *failures_so_far,
error: error.as_ref().map(ToString::to_string),
last_failure: anchor.convert(*last_failure),
},
OutgoingState::Connected {
peer_id,
handle,
health,
} => OutgoingStateInsight::Connected {
peer_id: *peer_id,
peer_addr: handle.peer_addr,
last_ping_sent: health
.last_ping_sent
.map(|tt| anchor.convert(tt.timestamp())),
last_pong_received: health
.last_pong_received
.map(|tt| anchor.convert(tt.timestamp())),
invalid_pong_count: health.invalid_pong_count,
rtt: health.calc_rrt(),
},
OutgoingState::Blocked {
since,
justification,
until,
} => OutgoingStateInsight::Blocked {
since: anchor.convert(*since),
justification: justification.to_string(),
until: anchor.convert(*until),
},
OutgoingState::Loopback => OutgoingStateInsight::Loopback,
}
}
fn fmt_time_relative(&self, now: SystemTime, f: &mut Formatter<'_>) -> fmt::Result {
match self {
OutgoingStateInsight::Connecting {
failures_so_far,
since,
} => write!(
f,
"connecting (fails: {}), since {}",
failures_so_far,
time_delta(now, *since)
),
OutgoingStateInsight::Waiting {
failures_so_far,
error,
last_failure,
} => write!(
f,
"waiting (fails: {}, last error: {}), since {}",
failures_so_far,
OptDisplay::new(error.as_ref(), "none"),
time_delta(now, *last_failure)
),
OutgoingStateInsight::Connected {
peer_id,
peer_addr,
last_ping_sent,
last_pong_received,
invalid_pong_count,
rtt,
} => {
let rtt_ms = rtt.map(|duration| duration.as_millis());
write!(
f,
"connected -> {} @ {} (rtt {}, invalid {}, last ping/pong {}/{})",
peer_id,
peer_addr,
OptDisplay::new(rtt_ms, "?"),
invalid_pong_count,
OptDisplay::new(last_ping_sent.map(|t| time_delta(now, t)), "-"),
OptDisplay::new(last_pong_received.map(|t| time_delta(now, t)), "-"),
)
}
OutgoingStateInsight::Blocked {
since,
justification,
until,
} => {
write!(
f,
"blocked since {}, until {}: {}",
time_delta(now, *since),
time_delta(now, *until),
justification
)
}
OutgoingStateInsight::Loopback => f.write_str("loopback"),
}
}
}
#[derive(Debug, Serialize)]
pub(super) enum ConnectionSymmetryInsight {
IncomingOnly {
since: SystemTime,
peer_addrs: BTreeSet<SocketAddr>,
},
OutgoingOnly {
since: SystemTime,
},
Symmetric {
peer_addrs: BTreeSet<SocketAddr>,
},
Gone,
}
impl ConnectionSymmetryInsight {
fn from_connection_symmetry(anchor: &TimeAnchor, sym: &ConnectionSymmetry) -> Self {
match sym {
ConnectionSymmetry::IncomingOnly { since, peer_addrs } => {
ConnectionSymmetryInsight::IncomingOnly {
since: anchor.convert(*since),
peer_addrs: peer_addrs.clone(),
}
}
ConnectionSymmetry::OutgoingOnly { since } => ConnectionSymmetryInsight::OutgoingOnly {
since: anchor.convert(*since),
},
ConnectionSymmetry::Symmetric { peer_addrs } => ConnectionSymmetryInsight::Symmetric {
peer_addrs: peer_addrs.clone(),
},
ConnectionSymmetry::Gone => ConnectionSymmetryInsight::Gone,
}
}
fn fmt_time_relative(&self, now: SystemTime, f: &mut Formatter<'_>) -> fmt::Result {
match self {
ConnectionSymmetryInsight::IncomingOnly { since, peer_addrs } => write!(
f,
"<- {} (since {})",
DisplayIter::new(peer_addrs.iter()),
time_delta(now, *since)
),
ConnectionSymmetryInsight::OutgoingOnly { since } => {
write!(f, "-> (since {})", time_delta(now, *since))
}
ConnectionSymmetryInsight::Symmetric { peer_addrs } => {
write!(f, "<> {}", DisplayIter::new(peer_addrs.iter()))
}
ConnectionSymmetryInsight::Gone => f.write_str("gone"),
}
}
}
impl NetworkInsights {
pub(super) fn collect_from_component<REv, P>(net: &Network<REv, P>) -> Self
where
P: Payload,
{
let (privileged_active_outgoing_nodes, privileged_upcoming_outgoing_nodes) = net
.outgoing_limiter
.debug_inspect_validators(&net.active_era)
.map(|(a, b)| (Some(a), Some(b)))
.unwrap_or_default();
let anchor = TimeAnchor::now();
let outgoing_connections = net
.outgoing_manager
.outgoing
.iter()
.map(|(addr, outgoing)| {
let state = OutgoingStateInsight::from_outgoing_state(&anchor, &outgoing.state);
(
*addr,
OutgoingInsight {
unforgettable: outgoing.is_unforgettable,
state,
},
)
})
.collect();
let connection_symmetries = net
.connection_symmetries
.iter()
.map(|(id, sym)| {
(
*id,
ConnectionSymmetryInsight::from_connection_symmetry(&anchor, sym),
)
})
.collect();
NetworkInsights {
our_id: net.context.our_id(),
network_ca: net.context.network_ca().is_some(),
public_addr: net.context.public_addr(),
is_syncing: net.context.is_syncing().load(Ordering::Relaxed),
net_active_era: net.active_era,
privileged_active_outgoing_nodes,
privileged_upcoming_outgoing_nodes,
unspent_bandwidth_allowance_bytes: net
.outgoing_limiter
.debug_inspect_unspent_allowance(),
outgoing_connections,
connection_symmetries,
}
}
}
impl Display for NetworkInsights {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let now = SystemTime::now();
if !self.network_ca {
f.write_str("Public ")?;
} else {
f.write_str("Private ")?;
}
writeln!(
f,
"node {} @ {:?} (syncing: {})",
self.our_id, self.public_addr, self.is_syncing
)?;
writeln!(
f,
"active era: {} unspent_bandwidth_allowance_bytes: {}",
self.net_active_era,
OptDisplay::new(self.unspent_bandwidth_allowance_bytes, "inactive"),
)?;
let active = self
.privileged_active_outgoing_nodes
.as_ref()
.map(HashSet::iter)
.map(DisplayIter::new);
writeln!(
f,
"privileged active: {}",
OptDisplay::new(active, "inactive")
)?;
let upcoming = self
.privileged_upcoming_outgoing_nodes
.as_ref()
.map(HashSet::iter)
.map(DisplayIter::new);
writeln!(
f,
"privileged upcoming: {}",
OptDisplay::new(upcoming, "inactive")
)?;
f.write_str("outgoing connections:\n")?;
writeln!(f, "address uf state")?;
for (addr, outgoing) in &self.outgoing_connections {
write!(f, "{:23} {:5} ", addr, outgoing.unforgettable,)?;
outgoing.state.fmt_time_relative(now, f)?;
f.write_str("\n")?;
}
f.write_str("connection symmetries:\n")?;
writeln!(f, "peer ID symmetry")?;
for (peer_id, symmetry) in &self.connection_symmetries {
write!(f, "{:10} ", peer_id)?;
symmetry.fmt_time_relative(now, f)?;
f.write_str("\n")?;
}
Ok(())
}
}