use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
time::{Duration, Instant},
};
use iroh_base::key::NodeId;
use tracing::{debug, event, Level};
use super::{
node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT},
IpPort, PingRole, Source,
};
use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL, stun};
const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct PathState {
node_id: NodeId,
path: SendAddr,
pub(super) last_ping: Option<Instant>,
last_got_ping: Option<(Instant, stun::TransactionId)>,
pub(super) call_me_maybe_time: Option<Instant>,
pub(super) recent_pong: Option<PongReply>,
pub(super) last_payload_msg: Option<Instant>,
pub(super) sources: HashMap<Source, Instant>,
}
impl PathState {
pub(super) fn new(node_id: NodeId, path: SendAddr, source: Source, now: Instant) -> Self {
let mut sources = HashMap::new();
sources.insert(source, now);
Self {
node_id,
path,
last_ping: None,
last_got_ping: None,
call_me_maybe_time: None,
recent_pong: None,
last_payload_msg: None,
sources,
}
}
pub(super) fn udp_addr(&self) -> Option<SocketAddr> {
match self.path {
SendAddr::Udp(addr) => Some(addr),
SendAddr::Relay(_) => None,
}
}
pub(super) fn with_last_payload(
node_id: NodeId,
path: SendAddr,
source: Source,
now: Instant,
) -> Self {
let mut sources = HashMap::new();
sources.insert(source, now);
PathState {
node_id,
path,
last_ping: None,
last_got_ping: None,
call_me_maybe_time: None,
recent_pong: None,
last_payload_msg: Some(now),
sources,
}
}
pub(super) fn with_ping(
node_id: NodeId,
path: SendAddr,
tx_id: stun::TransactionId,
source: Source,
now: Instant,
) -> Self {
let mut new = PathState::new(node_id, path, source, now);
new.handle_ping(tx_id, now);
new
}
pub(super) fn add_pong_reply(&mut self, r: PongReply) {
if let SendAddr::Udp(ref path) = self.path {
if self.recent_pong.is_none() {
event!(
target: "events.net.holepunched",
Level::DEBUG,
remote_node = %self.node_id.fmt_short(),
path = ?path,
direction = "outgoing",
);
}
}
self.recent_pong = Some(r);
}
#[cfg(test)]
pub(super) fn with_pong_reply(node_id: NodeId, r: PongReply) -> Self {
PathState {
node_id,
path: r.from.clone(),
last_ping: None,
last_got_ping: None,
call_me_maybe_time: None,
recent_pong: Some(r),
last_payload_msg: None,
sources: HashMap::new(),
}
}
pub(super) fn is_active(&self) -> bool {
self.last_payload_msg
.as_ref()
.map(|instant| instant.elapsed() <= SESSION_ACTIVE_TIMEOUT)
.unwrap_or(false)
}
pub(super) fn last_incoming_ping(&self) -> Option<&Instant> {
self.last_got_ping.as_ref().map(|(time, _tx_id)| time)
}
pub(super) fn last_alive(&self) -> Option<Instant> {
self.recent_pong
.as_ref()
.map(|pong| &pong.pong_at)
.into_iter()
.chain(self.last_payload_msg.as_ref())
.chain(self.call_me_maybe_time.as_ref())
.chain(self.last_incoming_ping())
.max()
.copied()
}
pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> {
let last_pong = self
.recent_pong
.as_ref()
.map(|pong| (pong.pong_at, ControlMsg::Pong));
let last_call_me_maybe = self
.call_me_maybe_time
.as_ref()
.map(|call_me| (*call_me, ControlMsg::CallMeMaybe));
let last_ping = self
.last_incoming_ping()
.map(|ping| (*ping, ControlMsg::Ping));
last_pong
.into_iter()
.chain(last_call_me_maybe)
.chain(last_ping)
.max_by_key(|(instant, _kind)| *instant)
.map(|(instant, kind)| (now.duration_since(instant), kind))
}
pub(super) fn latency(&self) -> Option<Duration> {
self.recent_pong.as_ref().map(|p| p.latency)
}
pub(super) fn needs_ping(&self, now: &Instant) -> bool {
match self.last_ping {
None => true,
Some(last_ping) => {
let elapsed = now.duration_since(last_ping);
elapsed > DISCO_PING_INTERVAL
}
}
}
pub(super) fn handle_ping(&mut self, tx_id: stun::TransactionId, now: Instant) -> PingRole {
if Some(&tx_id) == self.last_got_ping.as_ref().map(|(_t, tx_id)| tx_id) {
PingRole::Duplicate
} else {
let prev = self.last_got_ping.replace((now, tx_id));
let heartbeat_deadline = HEARTBEAT_INTERVAL + (HEARTBEAT_INTERVAL / 2);
match prev {
Some((prev_time, _tx)) if now.duration_since(prev_time) <= heartbeat_deadline => {
PingRole::LikelyHeartbeat
}
Some((prev_time, _tx)) => {
debug!(
elapsed = ?now.duration_since(prev_time),
"heartbeat missed, reactivating",
);
PingRole::Activate
}
None => {
if let SendAddr::Udp(ref addr) = self.path {
event!(
target: "events.net.holepunched",
Level::DEBUG,
remote_node = %self.node_id.fmt_short(),
path = ?addr,
direction = "incoming",
);
}
PingRole::Activate
}
}
}
}
pub(super) fn add_source(&mut self, source: Source, now: Instant) {
self.sources.insert(source, now);
}
pub(super) fn clear(&mut self) {
self.last_ping = None;
self.last_got_ping = None;
self.call_me_maybe_time = None;
self.recent_pong = None;
}
fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result {
write!(w, "{{ ")?;
if self.is_active() {
write!(w, "active ")?;
}
if let Some(ref pong) = self.recent_pong {
write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?;
}
if let Some(when) = self.last_incoming_ping() {
write!(w, "ping-received({:?} ago) ", when.elapsed())?;
}
if let Some(ref when) = self.last_ping {
write!(w, "ping-sent({:?} ago) ", when.elapsed())?;
}
if let Some(last_source) = self.sources.iter().max_by_key(|&(_, instant)| instant) {
write!(
w,
"last-source: {}({:?} ago)",
last_source.0,
last_source.1.elapsed()
)?;
}
write!(w, "}}")
}
}
pub(super) fn summarize_node_paths(paths: &BTreeMap<IpPort, PathState>) -> String {
use std::fmt::Write;
let mut w = String::new();
write!(&mut w, "[").ok();
for (i, (ipp, state)) in paths.iter().enumerate() {
if i > 0 {
write!(&mut w, ", ").ok();
}
write!(&mut w, "{ipp}").ok();
state.summary(&mut w).ok();
}
write!(&mut w, "]").ok();
w
}