use crate::NodeAddr;
use crate::mmp::report::ReceiverReport;
use crate::mmp::{MAX_SESSION_REPORT_INTERVAL_MS, MIN_SESSION_REPORT_INTERVAL_MS};
use crate::node::session::{EndToEndState, SessionEntry};
use crate::node::session_wire::{
FSP_COMMON_PREFIX_SIZE, FSP_FLAG_CP, FSP_FLAG_K, FSP_HEADER_SIZE, FSP_INNER_HEADER_SIZE,
FSP_PHASE_ESTABLISHED, FSP_PHASE_MSG1, FSP_PHASE_MSG2, FSP_PHASE_MSG3, FSP_PORT_HEADER_SIZE,
FSP_PORT_IPV6_SHIM, FspCommonPrefix, FspEncryptedHeader, build_fsp_header,
fsp_prepend_inner_header, fsp_strip_inner_header, parse_encrypted_coords,
};
use crate::node::wire::{
ESTABLISHED_HEADER_SIZE, FLAG_KEY_EPOCH, FLAG_SP, build_established_header,
};
use crate::node::{Node, NodeEndpointCommand, NodeEndpointEvent, NodeEndpointPeer, NodeError};
use crate::noise::{
HandshakeState, XK_HANDSHAKE_MSG1_SIZE, XK_HANDSHAKE_MSG2_SIZE, XK_HANDSHAKE_MSG3_SIZE,
};
use crate::protocol::{
CoordsRequired, FspInnerFlags, LinkMessageType, MtuExceeded, PathBroken, PathMtuNotification,
SESSION_DATAGRAM_HEADER_SIZE, SessionAck, SessionDatagram, SessionMessageType, SessionMsg3,
SessionReceiverReport, SessionSenderReport, SessionSetup,
};
use crate::protocol::{coords_wire_size, encode_coords};
use crate::transport::TransportHandle;
use crate::upper::icmp::FIPS_OVERHEAD;
use secp256k1::PublicKey;
use tracing::{debug, info, trace, warn};
enum FspFrameOutcome {
Authentic {
plaintext: Vec<u8>,
msg_type: u8,
inner_flags_byte: u8,
timestamp: u32,
},
UnknownSession,
NotEstablished,
BadInnerHeader,
DecryptFailed {
error: crate::noise::NoiseError,
counter: u64,
consecutive: u32,
recover_session: bool,
},
}
struct PipelinedEndpointSend<'a> {
dest_addr: &'a NodeAddr,
payload: &'a [u8],
now_ms: u64,
timestamp: u32,
fsp_flags: u8,
inner_plaintext: &'a [u8],
my_coords: Option<&'a crate::tree::TreeCoordinate>,
dest_coords: Option<&'a crate::tree::TreeCoordinate>,
}
const DECRYPT_FAILURE_RECOVERY_THRESHOLD: u32 = 32;
fn pending_rekey_wins_tiebreak(
our_addr: &NodeAddr,
peer_addr: &NodeAddr,
existing: &SessionEntry,
) -> bool {
existing.pending_new_session().is_some()
&& existing.is_rekey_initiator()
&& our_addr < peer_addr
}
fn should_start_decrypt_failure_rekey(entry: &SessionEntry, consecutive: u32) -> bool {
consecutive >= DECRYPT_FAILURE_RECOVERY_THRESHOLD
&& entry.is_established()
&& !entry.has_rekey_in_progress()
&& entry.pending_new_session().is_none()
}
impl Node {
pub(in crate::node) async fn handle_session_payload(
&mut self,
src_addr: &NodeAddr,
payload: &[u8],
path_mtu: u16,
ce_flag: bool,
previous_hop: Option<NodeAddr>,
) {
let prefix = match FspCommonPrefix::parse(payload) {
Some(p) => p,
None => {
debug!(
len = payload.len(),
"Session payload too short for FSP prefix"
);
return;
}
};
let inner = &payload[FSP_COMMON_PREFIX_SIZE..];
match prefix.phase {
FSP_PHASE_MSG1 => {
self.handle_session_setup(src_addr, inner).await;
}
FSP_PHASE_MSG2 => {
self.handle_session_ack(src_addr, inner).await;
}
FSP_PHASE_MSG3 => {
self.handle_session_msg3(src_addr, inner).await;
}
FSP_PHASE_ESTABLISHED if prefix.is_unencrypted() => {
if inner.is_empty() {
debug!("Empty plaintext error signal");
return;
}
let error_type = inner[0];
let error_body = &inner[1..];
match SessionMessageType::from_byte(error_type) {
Some(SessionMessageType::CoordsRequired) => {
self.handle_coords_required(error_body).await;
}
Some(SessionMessageType::PathBroken) => {
self.handle_path_broken(error_body).await;
}
Some(SessionMessageType::MtuExceeded) => {
self.handle_mtu_exceeded(error_body).await;
}
_ => {
debug!(error_type, "Unknown plaintext error signal type");
}
}
}
FSP_PHASE_ESTABLISHED => {
self.handle_encrypted_session_msg(
src_addr,
payload,
path_mtu,
ce_flag,
previous_hop,
)
.await;
}
_ => {
debug!(phase = prefix.phase, "Unknown FSP phase");
}
}
}
async fn handle_encrypted_session_msg(
&mut self,
src_addr: &NodeAddr,
payload: &[u8],
path_mtu: u16,
ce_flag: bool,
previous_hop: Option<NodeAddr>,
) {
let _t_fsp_handle =
crate::perf_profile::Timer::start(crate::perf_profile::Stage::FspHandle);
let header = match FspEncryptedHeader::parse(payload) {
Some(h) => h,
None => {
debug!(
len = payload.len(),
"Encrypted session message too short for FSP header"
);
return;
}
};
let mut ciphertext_offset = FSP_HEADER_SIZE;
if header.has_coords() {
let coord_data = &payload[FSP_HEADER_SIZE..];
match parse_encrypted_coords(coord_data) {
Ok((src_coords, dest_coords, bytes_consumed)) => {
let now_ms = Self::now_ms();
if let Some(coords) = src_coords {
self.coord_cache.insert(*src_addr, coords, now_ms);
}
if let Some(coords) = dest_coords {
self.coord_cache.insert(*self.node_addr(), coords, now_ms);
}
ciphertext_offset += bytes_consumed;
}
Err(e) => {
debug!(error = %e, "Failed to parse coords from encrypted session message");
return;
}
}
}
let ciphertext = &payload[ciphertext_offset..];
let received_k_bit = header.flags & FSP_FLAG_K != 0;
let outcome: FspFrameOutcome = 'outcome: {
let entry = match self.sessions.get_mut(src_addr) {
Some(e) => e,
None => break 'outcome FspFrameOutcome::UnknownSession,
};
if !entry.is_established() {
break 'outcome FspFrameOutcome::NotEstablished;
}
if received_k_bit != entry.current_k_bit() && entry.pending_new_session().is_some() {
info!(
src = %src_addr,
"Peer FSP K-bit flip detected, promoting new session"
);
let now_ms = Self::now_ms();
entry.handle_peer_kbit_flip(now_ms);
}
let session = match entry.state_mut() {
EndToEndState::Established(s) => s,
_ => break 'outcome FspFrameOutcome::NotEstablished,
};
let primary = {
let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FspDecrypt);
session.decrypt_with_replay_check_and_aad(
ciphertext,
header.counter,
&header.header_bytes,
)
};
let plaintext = match primary {
Ok(pt) => pt,
Err(primary_err) => {
let drain = entry.previous_noise_session_mut().and_then(|prev| {
prev.decrypt_with_replay_check_and_aad(
ciphertext,
header.counter,
&header.header_bytes,
)
.ok()
});
match drain {
Some(pt) => pt,
None => {
let consecutive = entry.record_decrypt_failure();
let recover_session =
should_start_decrypt_failure_rekey(entry, consecutive);
break 'outcome FspFrameOutcome::DecryptFailed {
error: primary_err,
counter: header.counter,
consecutive,
recover_session,
};
}
}
}
};
entry.reset_decrypt_failures();
if entry.handshake_payload().is_some() {
entry.clear_handshake_payload();
}
let (timestamp, msg_type, inner_flags_byte) = match fsp_strip_inner_header(&plaintext) {
Some((ts, mt, inf, _rest)) => (ts, mt, inf),
None => break 'outcome FspFrameOutcome::BadInnerHeader,
};
if let Some(mmp) = entry.mmp_mut() {
let now = std::time::Instant::now();
mmp.receiver
.record_recv(header.counter, timestamp, plaintext.len(), ce_flag, now);
let inner_flags = FspInnerFlags::from_byte(inner_flags_byte);
let _spin_rtt = mmp
.spin_bit
.rx_observe(inner_flags.spin_bit, header.counter, now);
mmp.path_mtu.observe_incoming_mtu(path_mtu);
}
FspFrameOutcome::Authentic {
plaintext,
msg_type,
inner_flags_byte,
timestamp,
}
};
let (plaintext, msg_type, _inner_flags_byte, _timestamp) = match outcome {
FspFrameOutcome::Authentic {
plaintext,
msg_type,
inner_flags_byte,
timestamp,
} => (plaintext, msg_type, inner_flags_byte, timestamp),
FspFrameOutcome::UnknownSession => {
debug!(src = %self.peer_display_name(src_addr), "Encrypted session message for unknown session");
return;
}
FspFrameOutcome::NotEstablished => {
debug!(
src = %self.peer_display_name(src_addr),
"Encrypted message but session not established (awaiting handshake completion)"
);
self.resend_handshake_after_early_encrypted_data(src_addr)
.await;
return;
}
FspFrameOutcome::BadInnerHeader => {
debug!(src = %self.peer_display_name(src_addr), "Decrypted payload too short for FSP inner header");
return;
}
FspFrameOutcome::DecryptFailed {
error,
counter,
consecutive,
recover_session,
} => {
debug!(
error = %error, src = %self.peer_display_name(src_addr),
counter, consecutive_failures = consecutive,
"Session AEAD decryption failed"
);
if recover_session {
warn!(
peer = %self.peer_display_name(src_addr),
consecutive_failures = consecutive,
"Session AEAD failures exceeded threshold; starting recovery rekey"
);
if !self.initiate_session_rekey(src_addr).await {
debug!(
peer = %self.peer_display_name(src_addr),
"Failed to start recovery rekey after decrypt-failure threshold"
);
}
}
return;
}
};
if let Some(next_hop) = previous_hop {
self.learn_reverse_route(*src_addr, next_hop);
}
let rest_len = plaintext.len() - FSP_INNER_HEADER_SIZE;
let rest = &plaintext[FSP_INNER_HEADER_SIZE..];
match SessionMessageType::from_byte(msg_type) {
Some(SessionMessageType::DataPacket) => {
if rest.len() < FSP_PORT_HEADER_SIZE {
debug!(len = rest.len(), "DataPacket too short for port header");
return;
}
let dst_port = u16::from_le_bytes([rest[2], rest[3]]);
let service_payload = &rest[FSP_PORT_HEADER_SIZE..];
match dst_port {
FSP_PORT_IPV6_SHIM => {
use crate::FipsAddress;
let src_ipv6 = FipsAddress::from_node_addr(src_addr).to_ipv6().octets();
let dst_ipv6 = FipsAddress::from_node_addr(self.node_addr())
.to_ipv6()
.octets();
match crate::upper::ipv6_shim::decompress_ipv6(
service_payload,
src_ipv6,
dst_ipv6,
) {
Some(mut packet) => {
if ce_flag {
mark_ipv6_ecn_ce(&mut packet);
self.stats_mut().congestion.record_ce_received();
}
if self.external_packet_tx.is_some() {
self.deliver_external_ipv6_packet(src_addr, packet);
} else if let Some(tun_tx) = &self.tun_tx {
let _t = crate::perf_profile::Timer::start(
crate::perf_profile::Stage::TunWrite,
);
if let Err(e) = tun_tx.send(packet) {
debug!(error = %e, "Failed to deliver decompressed IPv6 packet to TUN");
}
} else {
trace!(
src = %self.peer_display_name(src_addr),
"IPv6 shim packet decompressed (no TUN interface)"
);
}
}
None => {
debug!(
src = %self.peer_display_name(src_addr),
len = service_payload.len(),
"IPv6 shim decompression failed"
);
}
}
}
_ => {
debug!(
src = %self.peer_display_name(src_addr),
dst_port,
"Unknown FSP service port, dropping DataPacket"
);
}
}
}
Some(SessionMessageType::EndpointData) => {
let mut payload = plaintext;
payload.drain(..FSP_INNER_HEADER_SIZE);
self.deliver_endpoint_data(src_addr, payload);
}
Some(SessionMessageType::SenderReport) => {
self.handle_session_sender_report(src_addr, rest);
}
Some(SessionMessageType::ReceiverReport) => {
self.handle_session_receiver_report(src_addr, rest);
}
Some(SessionMessageType::PathMtuNotification) => {
self.handle_session_path_mtu_notification(src_addr, rest);
}
Some(SessionMessageType::CoordsWarmup) => {
trace!(src = %self.peer_display_name(src_addr), "CoordsWarmup received");
}
_ => {
debug!(src = %self.peer_display_name(src_addr), msg_type, "Unknown session message type, dropping");
}
}
if (msg_type == SessionMessageType::DataPacket.to_byte()
|| msg_type == SessionMessageType::EndpointData.to_byte())
&& let Some(entry) = self.sessions.get_mut(src_addr)
{
entry.record_recv(rest_len);
entry.touch(Self::now_ms());
}
self.flush_pending_packets(src_addr).await;
}
async fn handle_session_setup(&mut self, src_addr: &NodeAddr, inner: &[u8]) {
let setup = match SessionSetup::decode(inner) {
Ok(s) => s,
Err(e) => {
debug!(error = %e, "Malformed SessionSetup");
return;
}
};
if setup.handshake_payload.len() != XK_HANDSHAKE_MSG1_SIZE {
debug!(
len = setup.handshake_payload.len(),
expected = XK_HANDSHAKE_MSG1_SIZE,
"Invalid handshake payload size in SessionSetup"
);
return;
}
if let Some(existing) = self.sessions.get(src_addr) {
if existing.is_initiating() {
if self.identity.node_addr() < src_addr {
debug!(
src = %self.peer_display_name(src_addr),
"Simultaneous session initiation: we win (smaller addr), dropping their setup"
);
return;
}
debug!(
src = %self.peer_display_name(src_addr),
"Simultaneous session initiation: we lose, becoming responder"
);
} else if existing.is_awaiting_msg3() {
if let Some(payload) = existing.handshake_payload() {
debug!(src = %self.peer_display_name(src_addr), "Duplicate SessionSetup, resending SessionAck");
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, payload.to_vec())
.with_ttl(self.config.node.session.default_ttl);
if let Err(e) = self.send_session_datagram(&mut datagram).await {
debug!(error = %e, dest = %self.peer_display_name(src_addr), "Failed to resend SessionAck");
}
} else {
debug!(src = %self.peer_display_name(src_addr), "Duplicate SessionSetup, no stored ack to resend");
}
return;
} else if existing.is_established() {
if self.config.node.rekey.enabled {
let rekey_in_progress = existing.has_rekey_in_progress();
let has_pending = existing.pending_new_session().is_some();
if rekey_in_progress {
if self.identity.node_addr() < src_addr {
debug!(
src = %self.peer_display_name(src_addr),
"Dual FSP rekey initiation: we win (smaller addr), dropping their msg1"
);
return;
}
debug!(
src = %self.peer_display_name(src_addr),
"Dual FSP rekey initiation: we lose (larger addr), abandoning ours"
);
let entry = self.sessions.get_mut(src_addr).unwrap();
entry.abandon_rekey();
} else if has_pending {
if pending_rekey_wins_tiebreak(
self.identity.node_addr(),
src_addr,
existing,
) {
debug!(
src = %self.peer_display_name(src_addr),
"FSP rekey msg1 received while local pending rekey wins tiebreak, dropping"
);
return;
}
debug!(
src = %self.peer_display_name(src_addr),
local_pending_initiator = existing.is_rekey_initiator(),
"FSP rekey msg1 received with stale pending rekey, abandoning pending and responding"
);
let entry = self.sessions.get_mut(src_addr).unwrap();
entry.abandon_rekey();
}
let our_keypair = self.identity.keypair();
let mut handshake = HandshakeState::new_xk_responder(our_keypair);
handshake.set_local_epoch(self.startup_epoch);
if let Err(e) = handshake.read_xk_message_1(&setup.handshake_payload) {
debug!(error = %e, "Failed to process rekey XK msg1");
return;
}
let msg2 = match handshake.write_xk_message_2() {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Failed to generate rekey XK msg2");
return;
}
};
let our_coords = self.tree_state.my_coords().clone();
let ack = SessionAck::new(our_coords, setup.src_coords).with_handshake(msg2);
let ack_payload = ack.encode();
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, ack_payload)
.with_ttl(self.config.node.session.default_ttl);
if let Err(e) = self.send_session_datagram(&mut datagram).await {
debug!(error = %e, dest = %self.peer_display_name(src_addr), "Failed to send rekey SessionAck");
return;
}
let now_ms = Self::now_ms();
let entry = self.sessions.get_mut(src_addr).unwrap();
entry.set_rekey_state(handshake, false);
entry.record_peer_rekey(now_ms);
debug!(
src = %self.peer_display_name(src_addr),
"FSP rekey: processed peer's msg1, sent msg2, awaiting msg3"
);
return;
}
debug!(src = %self.peer_display_name(src_addr), "Session re-establishment from peer");
}
}
let our_keypair = self.identity.keypair();
let mut handshake = HandshakeState::new_xk_responder(our_keypair);
handshake.set_local_epoch(self.startup_epoch);
if let Err(e) = handshake.read_xk_message_1(&setup.handshake_payload) {
debug!(error = %e, "Failed to process Noise XK msg1 in SessionSetup");
return;
}
let msg2 = match handshake.write_xk_message_2() {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Failed to generate Noise XK msg2 for SessionAck");
return;
}
};
let our_coords = self.tree_state.my_coords().clone();
let ack = SessionAck::new(our_coords, setup.src_coords).with_handshake(msg2);
let ack_payload = ack.encode();
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, ack_payload.clone())
.with_ttl(self.config.node.session.default_ttl);
if let Err(e) = self.send_session_datagram(&mut datagram).await {
debug!(error = %e, dest = %self.peer_display_name(src_addr), "Failed to send SessionAck");
return;
}
let placeholder_pubkey = self.identity.keypair().public_key();
let now_ms = Self::now_ms();
let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
let mut entry = SessionEntry::new(
*src_addr,
placeholder_pubkey,
EndToEndState::AwaitingMsg3(handshake),
now_ms,
false,
);
entry.set_handshake_payload(ack_payload, now_ms + resend_interval);
self.sessions.insert(*src_addr, entry);
debug!(src = %self.peer_display_name(src_addr), "SessionSetup processed (XK), SessionAck sent, awaiting msg3");
}
async fn handle_session_ack(&mut self, src_addr: &NodeAddr, inner: &[u8]) {
let ack = match SessionAck::decode(inner) {
Ok(a) => a,
Err(e) => {
debug!(error = %e, "Malformed SessionAck");
return;
}
};
if ack.handshake_payload.len() != XK_HANDSHAKE_MSG2_SIZE {
debug!(
len = ack.handshake_payload.len(),
expected = XK_HANDSHAKE_MSG2_SIZE,
"Invalid handshake payload size in SessionAck"
);
return;
}
let mut entry = match self.sessions.remove(src_addr) {
Some(e) => e,
None => {
debug!(src = %self.peer_display_name(src_addr), "SessionAck for unknown session");
return;
}
};
if entry.is_established() && entry.has_rekey_in_progress() && entry.is_rekey_initiator() {
let mut handshake = match entry.take_rekey_state() {
Some(hs) => hs,
None => {
self.sessions.insert(*src_addr, entry);
return;
}
};
if let Err(e) = handshake.read_xk_message_2(&ack.handshake_payload) {
debug!(error = %e, "Failed to process rekey XK msg2");
entry.abandon_rekey();
self.sessions.insert(*src_addr, entry);
return;
}
let msg3 = match handshake.write_xk_message_3() {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Failed to generate rekey XK msg3");
entry.abandon_rekey();
self.sessions.insert(*src_addr, entry);
return;
}
};
let msg3_wire = SessionMsg3::new(msg3);
let msg3_payload = msg3_wire.encode();
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, msg3_payload)
.with_ttl(self.config.node.session.default_ttl);
if let Err(e) = self.send_session_datagram(&mut datagram).await {
debug!(error = %e, dest = %self.peer_display_name(src_addr), "Failed to send rekey SessionMsg3");
entry.abandon_rekey();
self.sessions.insert(*src_addr, entry);
return;
}
let session = match handshake.into_session() {
Ok(s) => s,
Err(e) => {
debug!(error = %e, "Failed to create session from rekey XK");
entry.abandon_rekey();
self.sessions.insert(*src_addr, entry);
return;
}
};
entry.set_pending_session(session);
entry.set_rekey_completed_ms(Self::now_ms());
self.sessions.insert(*src_addr, entry);
debug!(
src = %self.peer_display_name(src_addr),
"FSP rekey: completed XK as initiator, pending cutover"
);
return;
}
if entry.is_established() {
if let Some(payload) = entry.handshake_payload().map(<[u8]>::to_vec) {
if entry.resend_count() < self.config.node.rate_limit.handshake_max_resends {
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, payload)
.with_ttl(self.config.node.session.default_ttl);
let sent = match self.send_session_datagram(&mut datagram).await {
Ok(()) => true,
Err(e) => {
debug!(
src = %self.peer_display_name(src_addr),
error = %e,
"Failed to resend final SessionMsg3 after duplicate SessionAck"
);
false
}
};
if sent {
let now_ms = Self::now_ms();
let interval = self.config.node.rate_limit.handshake_resend_interval_ms;
entry.record_resend(now_ms + interval);
debug!(
src = %self.peer_display_name(src_addr),
"Duplicate SessionAck after establishment, resent final SessionMsg3"
);
}
} else {
entry.clear_handshake_payload();
}
} else {
debug!(src = %self.peer_display_name(src_addr), "SessionAck for already-established session");
}
self.sessions.insert(*src_addr, entry);
return;
}
if !entry.is_initiating() {
debug!(src = %self.peer_display_name(src_addr), "SessionAck but session not in Initiating state");
self.sessions.insert(*src_addr, entry);
return;
}
let mut handshake = match entry.take_state() {
Some(EndToEndState::Initiating(hs)) => hs,
_ => unreachable!("checked is_initiating above"),
};
if let Err(e) = handshake.read_xk_message_2(&ack.handshake_payload) {
debug!(error = %e, "Failed to process Noise XK msg2 in SessionAck");
return; }
let msg3 = match handshake.write_xk_message_3() {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Failed to generate Noise XK msg3");
return;
}
};
let msg3_wire = SessionMsg3::new(msg3);
let msg3_payload = msg3_wire.encode();
let msg3_resend_payload = msg3_payload.clone();
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, msg3_payload)
.with_ttl(self.config.node.session.default_ttl);
if let Err(e) = self.send_session_datagram(&mut datagram).await {
debug!(error = %e, dest = %self.peer_display_name(src_addr), "Failed to send SessionMsg3");
return;
}
let session = match handshake.into_session() {
Ok(s) => s,
Err(e) => {
debug!(error = %e, "Failed to create session after XK msg3");
return;
}
};
let now_ms = Self::now_ms();
entry.set_state(EndToEndState::Established(session));
entry.set_coords_warmup_remaining(self.config.node.session.coords_warmup_packets);
entry.mark_established(now_ms);
entry.init_mmp(&self.config.node.session_mmp);
let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
entry.set_handshake_payload(msg3_resend_payload, now_ms + resend_interval);
entry.touch(now_ms);
self.sessions.insert(*src_addr, entry);
self.coord_cache.insert(*src_addr, ack.src_coords, now_ms);
self.flush_pending_packets(src_addr).await;
info!(src = %self.peer_display_name(src_addr), "Session established (initiator, XK)");
}
async fn resend_handshake_after_early_encrypted_data(&mut self, src_addr: &NodeAddr) {
let max_resends = self.config.node.rate_limit.handshake_max_resends;
let payload = match self.sessions.get(src_addr) {
Some(entry)
if entry.handshake_payload().is_some() && entry.resend_count() < max_resends =>
{
entry.handshake_payload().map(<[u8]>::to_vec)
}
Some(entry) if entry.handshake_payload().is_some() => {
let name = self.peer_display_name(src_addr);
if let Some(entry) = self.sessions.get_mut(src_addr) {
entry.clear_handshake_payload();
}
debug!(
src = %name,
"Early encrypted data arrived after handshake resend budget was exhausted"
);
None
}
_ => None,
};
let Some(payload) = payload else {
return;
};
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *src_addr, payload)
.with_ttl(self.config.node.session.default_ttl);
let sent = match self.send_session_datagram(&mut datagram).await {
Ok(()) => true,
Err(e) => {
debug!(
src = %self.peer_display_name(src_addr),
error = %e,
"Failed to resend session handshake after early encrypted data"
);
false
}
};
if sent {
let now_ms = Self::now_ms();
let interval = self.config.node.rate_limit.handshake_resend_interval_ms;
if let Some(entry) = self.sessions.get_mut(src_addr) {
entry.record_resend(now_ms + interval);
}
debug!(
src = %self.peer_display_name(src_addr),
"Resent session handshake after early encrypted data"
);
}
}
async fn handle_session_msg3(&mut self, src_addr: &NodeAddr, inner: &[u8]) {
let msg3 = match SessionMsg3::decode(inner) {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Malformed SessionMsg3");
return;
}
};
if msg3.handshake_payload.len() != XK_HANDSHAKE_MSG3_SIZE {
debug!(
len = msg3.handshake_payload.len(),
expected = XK_HANDSHAKE_MSG3_SIZE,
"Invalid handshake payload size in SessionMsg3"
);
return;
}
let mut entry = match self.sessions.remove(src_addr) {
Some(e) => e,
None => {
debug!(src = %self.peer_display_name(src_addr), "SessionMsg3 for unknown session");
return;
}
};
if entry.is_established() && entry.has_rekey_in_progress() && !entry.is_rekey_initiator() {
let mut handshake = match entry.take_rekey_state() {
Some(hs) => hs,
None => {
self.sessions.insert(*src_addr, entry);
return;
}
};
if let Err(e) = handshake.read_xk_message_3(&msg3.handshake_payload) {
debug!(error = %e, "Failed to process rekey XK msg3");
entry.abandon_rekey();
self.sessions.insert(*src_addr, entry);
return;
}
let session = match handshake.into_session() {
Ok(s) => s,
Err(e) => {
debug!(error = %e, "Failed to create session from rekey XK msg3");
entry.abandon_rekey();
self.sessions.insert(*src_addr, entry);
return;
}
};
entry.set_pending_session(session);
self.sessions.insert(*src_addr, entry);
debug!(
src = %self.peer_display_name(src_addr),
"FSP rekey: completed XK as responder, pending cutover"
);
return;
}
if !entry.is_awaiting_msg3() {
debug!(src = %self.peer_display_name(src_addr), "SessionMsg3 but session not in AwaitingMsg3 state");
self.sessions.insert(*src_addr, entry);
return;
}
let mut handshake = match entry.take_state() {
Some(EndToEndState::AwaitingMsg3(hs)) => hs,
_ => unreachable!("checked is_awaiting_msg3 above"),
};
if let Err(e) = handshake.read_xk_message_3(&msg3.handshake_payload) {
debug!(error = %e, "Failed to process Noise XK msg3");
return; }
let remote_pubkey = match handshake.remote_static() {
Some(pk) => *pk,
None => {
debug!("No remote static key after processing XK msg3");
return;
}
};
self.register_identity(*src_addr, remote_pubkey);
let session = match handshake.into_session() {
Ok(s) => s,
Err(e) => {
debug!(error = %e, "Failed to create session from XK handshake");
return;
}
};
let now_ms = Self::now_ms();
let mut new_entry = SessionEntry::new(
*src_addr,
remote_pubkey,
EndToEndState::Established(session),
now_ms,
false,
);
new_entry.set_coords_warmup_remaining(self.config.node.session.coords_warmup_packets);
new_entry.mark_established(now_ms);
new_entry.init_mmp(&self.config.node.session_mmp);
new_entry.touch(now_ms);
self.sessions.insert(*src_addr, new_entry);
self.flush_pending_packets(src_addr).await;
info!(src = %self.peer_display_name(src_addr), "Session established (responder, XK)");
}
fn handle_session_sender_report(&mut self, src_addr: &NodeAddr, body: &[u8]) {
let sr = match SessionSenderReport::decode(body) {
Ok(sr) => sr,
Err(e) => {
debug!(src = %self.peer_display_name(src_addr), error = %e, "Malformed SessionSenderReport");
return;
}
};
trace!(
src = %self.peer_display_name(src_addr),
cum_pkts = sr.cumulative_packets_sent,
interval_bytes = sr.interval_bytes_sent,
"Received SessionSenderReport"
);
}
fn handle_session_receiver_report(&mut self, src_addr: &NodeAddr, body: &[u8]) {
let session_rr = match SessionReceiverReport::decode(body) {
Ok(rr) => rr,
Err(e) => {
debug!(src = %self.peer_display_name(src_addr), error = %e, "Malformed SessionReceiverReport");
return;
}
};
let rr: ReceiverReport = ReceiverReport::from(&session_rr);
let now_ms = Self::now_ms();
let peer_name = self.peer_display_name(src_addr);
let entry = match self.sessions.get_mut(src_addr) {
Some(e) => e,
None => {
debug!(src = %peer_name, "SessionReceiverReport for unknown session");
return;
}
};
let our_timestamp_ms = entry.session_timestamp(now_ms);
let Some(mmp) = entry.mmp_mut() else {
return;
};
let now = std::time::Instant::now();
mmp.metrics
.process_receiver_report(&rr, our_timestamp_ms, now);
if let Some(srtt_ms) = mmp.metrics.srtt_ms() {
let srtt_us = (srtt_ms * 1000.0) as i64;
mmp.sender.update_report_interval_with_bounds(
srtt_us,
MIN_SESSION_REPORT_INTERVAL_MS,
MAX_SESSION_REPORT_INTERVAL_MS,
);
mmp.receiver.update_report_interval_with_bounds(
srtt_us,
MIN_SESSION_REPORT_INTERVAL_MS,
MAX_SESSION_REPORT_INTERVAL_MS,
);
mmp.path_mtu.update_interval_from_srtt(srtt_ms);
}
let our_recv_packets = mmp.receiver.cumulative_packets_recv();
let peer_highest = mmp.receiver.highest_counter();
mmp.metrics
.update_reverse_delivery(our_recv_packets, peer_highest);
trace!(
src = %peer_name,
rtt_ms = ?mmp.metrics.srtt_ms(),
loss = format_args!("{:.1}%", mmp.metrics.loss_rate() * 100.0),
"Processed SessionReceiverReport"
);
}
pub(in crate::node) fn handle_session_path_mtu_notification(
&mut self,
src_addr: &NodeAddr,
body: &[u8],
) {
let notif = match PathMtuNotification::decode(body) {
Ok(n) => n,
Err(e) => {
debug!(src = %self.peer_display_name(src_addr), error = %e, "Malformed PathMtuNotification");
return;
}
};
let peer_name = self.peer_display_name(src_addr);
let entry = match self.sessions.get_mut(src_addr) {
Some(e) => e,
None => {
debug!(src = %peer_name, "PathMtuNotification for unknown session");
return;
}
};
let Some(mmp) = entry.mmp_mut() else {
return;
};
let old_mtu = mmp.path_mtu.current_mtu();
let now = std::time::Instant::now();
let changed = mmp.path_mtu.apply_notification(notif.path_mtu, now);
let new_mtu = mmp.path_mtu.current_mtu();
if !changed {
return;
}
debug!(
src = %peer_name,
old_mtu,
new_mtu,
"Path MTU changed via notification"
);
let fips_addr = crate::FipsAddress::from_node_addr(src_addr);
match self.path_mtu_lookup.write() {
Ok(mut map) => match map.get(&fips_addr).copied() {
Some(existing) if existing <= new_mtu => {
debug!(
dest = %peer_name,
fips_addr = %fips_addr,
new_mtu,
existing,
"PathMtuNotification: keeping tighter existing path_mtu_lookup value"
);
}
other => {
map.insert(fips_addr, new_mtu);
debug!(
dest = %peer_name,
fips_addr = %fips_addr,
new_mtu,
prior = ?other,
map_len = map.len(),
"PathMtuNotification: tightened path_mtu_lookup"
);
}
},
Err(e) => {
warn!(
dest = %peer_name,
fips_addr = %fips_addr,
new_mtu,
error = %e,
"path_mtu_lookup write lock poisoned; PathMtuNotification not reflected"
);
}
}
}
async fn handle_coords_required(&mut self, inner: &[u8]) {
self.stats_mut().errors.coords_required += 1;
let msg = match CoordsRequired::decode(inner) {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Malformed CoordsRequired");
return;
}
};
debug!(
dest = %msg.dest_addr,
reporter = %msg.reporter,
"CoordsRequired: transit router needs coordinates"
);
if self
.coords_response_rate_limiter
.should_send(&msg.dest_addr)
{
if let Some(entry) = self.sessions.get(&msg.dest_addr)
&& entry.is_established()
&& let Err(e) = self.send_coords_warmup(&msg.dest_addr).await
{
debug!(dest = %msg.dest_addr, error = %e,
"Failed to send CoordsWarmup in response to CoordsRequired");
}
} else {
trace!(dest = %msg.dest_addr,
"CoordsRequired response rate-limited, skipping standalone CoordsWarmup");
}
if self.has_cached_identity(&msg.dest_addr) {
self.maybe_initiate_lookup(&msg.dest_addr).await;
} else {
debug!(dest = %msg.dest_addr,
"Skipping discovery after CoordsRequired: no cached identity for target");
}
if let Some(entry) = self.sessions.get_mut(&msg.dest_addr) {
let n = self.config.node.session.coords_warmup_packets;
entry.set_coords_warmup_remaining(n);
debug!(
dest = %msg.dest_addr,
warmup_packets = n,
"Reset coords warmup counter after CoordsRequired"
);
}
}
async fn handle_path_broken(&mut self, inner: &[u8]) {
self.stats_mut().errors.path_broken += 1;
let msg = match PathBroken::decode(inner) {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Malformed PathBroken");
return;
}
};
debug!(
dest = %msg.dest_addr,
reporter = %msg.reporter,
"PathBroken: transit router reports routing failure"
);
if self
.coords_response_rate_limiter
.should_send(&msg.dest_addr)
{
if let Some(entry) = self.sessions.get(&msg.dest_addr)
&& entry.is_established()
&& let Err(e) = self.send_coords_warmup(&msg.dest_addr).await
{
debug!(dest = %msg.dest_addr, error = %e,
"Failed to send CoordsWarmup in response to PathBroken");
}
} else {
trace!(dest = %msg.dest_addr,
"PathBroken response rate-limited, skipping standalone CoordsWarmup");
}
self.coord_cache.remove(&msg.dest_addr);
if self.has_cached_identity(&msg.dest_addr) {
self.maybe_initiate_lookup(&msg.dest_addr).await;
} else {
debug!(dest = %msg.dest_addr,
"Skipping discovery after PathBroken: no cached identity for target");
}
if let Some(entry) = self.sessions.get_mut(&msg.dest_addr) {
let n = self.config.node.session.coords_warmup_packets;
entry.set_coords_warmup_remaining(n);
debug!(
dest = %msg.dest_addr,
warmup_packets = n,
"Reset coords warmup counter after PathBroken"
);
}
}
pub(in crate::node) async fn handle_mtu_exceeded(&mut self, inner: &[u8]) {
self.stats_mut().errors.mtu_exceeded += 1;
let msg = match MtuExceeded::decode(inner) {
Ok(m) => m,
Err(e) => {
debug!(error = %e, "Malformed MtuExceeded");
return;
}
};
let peer_name = self.peer_display_name(&msg.dest_addr);
debug!(
dest = %peer_name,
reporter = %msg.reporter,
bottleneck_mtu = msg.mtu,
"MtuExceeded: transit router reports oversized packet"
);
if let Some(entry) = self.sessions.get_mut(&msg.dest_addr)
&& let Some(mmp) = entry.mmp_mut()
{
let old_mtu = mmp.path_mtu.current_mtu();
let now = std::time::Instant::now();
if mmp.path_mtu.apply_notification(msg.mtu, now) {
let new_mtu = mmp.path_mtu.current_mtu();
info!(
dest = %peer_name,
old_mtu,
new_mtu,
reporter = %msg.reporter,
"Path MTU decreased via reactive MtuExceeded signal"
);
}
}
let fips_addr = crate::FipsAddress::from_node_addr(&msg.dest_addr);
match self.path_mtu_lookup.write() {
Ok(mut map) => match map.get(&fips_addr).copied() {
Some(existing) if existing <= msg.mtu => {
debug!(
dest = %peer_name,
fips_addr = %fips_addr,
bottleneck_mtu = msg.mtu,
existing,
"Reactive MtuExceeded: keeping tighter existing path_mtu_lookup value"
);
}
other => {
map.insert(fips_addr, msg.mtu);
debug!(
dest = %peer_name,
fips_addr = %fips_addr,
bottleneck_mtu = msg.mtu,
prior = ?other,
map_len = map.len(),
"Reactive MtuExceeded: tightened path_mtu_lookup"
);
}
},
Err(e) => {
warn!(
dest = %peer_name,
fips_addr = %fips_addr,
bottleneck_mtu = msg.mtu,
error = %e,
"path_mtu_lookup write lock poisoned; reactive MtuExceeded not reflected"
);
}
}
}
pub(in crate::node) async fn initiate_session(
&mut self,
dest_addr: NodeAddr,
dest_pubkey: PublicKey,
) -> Result<(), NodeError> {
if let Some(existing) = self.sessions.get(&dest_addr)
&& (existing.is_established() || existing.is_initiating())
{
return Ok(());
}
let our_keypair = self.identity.keypair();
let mut handshake = HandshakeState::new_xk_initiator(our_keypair, dest_pubkey);
handshake.set_local_epoch(self.startup_epoch);
let msg1 = handshake
.write_xk_message_1()
.map_err(|e| NodeError::SendFailed {
node_addr: dest_addr,
reason: format!("Noise XK msg1 generation failed: {}", e),
})?;
let our_coords = self.tree_state.my_coords().clone();
let dest_coords = self.get_dest_coords(&dest_addr);
let setup = SessionSetup::new(our_coords, dest_coords).with_handshake(msg1);
let setup_payload = setup.encode();
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, dest_addr, setup_payload.clone())
.with_ttl(self.config.node.session.default_ttl);
self.send_session_datagram(&mut datagram).await?;
self.register_identity(dest_addr, dest_pubkey);
let now_ms = Self::now_ms();
let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
let mut entry = SessionEntry::new(
dest_addr,
dest_pubkey,
EndToEndState::Initiating(handshake),
now_ms,
true,
);
entry.set_handshake_payload(setup_payload, now_ms + resend_interval);
self.sessions.insert(dest_addr, entry);
debug!(dest = %self.peer_display_name(&dest_addr), "Session initiation started");
Ok(())
}
pub(in crate::node) async fn send_session_data(
&mut self,
dest_addr: &NodeAddr,
src_port: u16,
dst_port: u16,
payload: &[u8],
) -> Result<(), NodeError> {
let now_ms = Self::now_ms();
let entry = self
.sessions
.get(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let wants_coords = entry.coords_warmup_remaining() > 0;
let timestamp = entry.session_timestamp(now_ms);
let spin_bit = entry.mmp().is_some_and(|m| m.spin_bit.tx_bit());
if !entry.is_established() {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
let mut port_payload = Vec::with_capacity(FSP_PORT_HEADER_SIZE + payload.len());
port_payload.extend_from_slice(&src_port.to_le_bytes());
port_payload.extend_from_slice(&dst_port.to_le_bytes());
port_payload.extend_from_slice(payload);
let msg_type = SessionMessageType::DataPacket.to_byte(); let inner_flags = FspInnerFlags { spin_bit }.to_byte();
let inner_plaintext =
fsp_prepend_inner_header(timestamp, msg_type, inner_flags, &port_payload);
let (include_coords, my_coords, dest_coords) = if wants_coords {
let src = self.tree_state.my_coords().clone();
let dst = self.get_dest_coords(dest_addr);
let coords_size = coords_wire_size(&src) + coords_wire_size(&dst);
let total_wire =
FIPS_OVERHEAD as usize + FSP_PORT_HEADER_SIZE + coords_size + payload.len();
if total_wire <= self.transport_mtu() as usize {
(true, Some(src), Some(dst))
} else {
if let Err(e) = self.send_coords_warmup(dest_addr).await {
debug!(dest = %self.peer_display_name(dest_addr), error = %e,
"Failed to send standalone CoordsWarmup before data packet");
}
(false, None, None)
}
} else {
(false, None, None)
};
if wants_coords && let Some(entry) = self.sessions.get_mut(dest_addr) {
entry.set_coords_warmup_remaining(entry.coords_warmup_remaining() - 1);
}
let mut flags = if include_coords { FSP_FLAG_CP } else { 0 };
if let Some(entry) = self.sessions.get(dest_addr)
&& entry.current_k_bit()
{
flags |= FSP_FLAG_K;
}
let entry = self
.sessions
.get_mut(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let session = match entry.state_mut() {
EndToEndState::Established(s) => s,
_ => {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
};
let counter = session.current_send_counter();
let payload_len = inner_plaintext.len() as u16;
let header = build_fsp_header(counter, flags, payload_len);
let ciphertext = {
let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FspEncrypt);
session
.encrypt_with_aad(&inner_plaintext, &header)
.map_err(|e| NodeError::SendFailed {
node_addr: *dest_addr,
reason: format!("session encrypt failed: {}", e),
})?
};
let mut fsp_payload = Vec::with_capacity(FSP_HEADER_SIZE + ciphertext.len() + 200);
fsp_payload.extend_from_slice(&header);
if let (Some(src), Some(dst)) = (&my_coords, &dest_coords) {
encode_coords(src, &mut fsp_payload);
encode_coords(dst, &mut fsp_payload);
}
fsp_payload.extend_from_slice(&ciphertext);
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *dest_addr, fsp_payload)
.with_ttl(self.config.node.session.default_ttl);
self.send_session_datagram(&mut datagram).await?;
if let Some(entry) = self.sessions.get_mut(dest_addr) {
entry.record_sent(payload.len());
if let Some(mmp) = entry.mmp_mut() {
mmp.sender.record_sent(counter, timestamp, ciphertext.len());
}
entry.touch(now_ms);
}
Ok(())
}
pub(in crate::node) async fn send_ipv6_packet(
&mut self,
dest_addr: &NodeAddr,
ipv6_packet: &[u8],
) -> Result<(), NodeError> {
let compressed = crate::upper::ipv6_shim::compress_ipv6(ipv6_packet).ok_or_else(|| {
NodeError::SendFailed {
node_addr: *dest_addr,
reason: "IPv6 header compression failed".into(),
}
})?;
self.send_session_data(
dest_addr,
FSP_PORT_IPV6_SHIM,
FSP_PORT_IPV6_SHIM,
&compressed,
)
.await
}
pub(in crate::node) async fn handle_endpoint_data_command(
&mut self,
command: NodeEndpointCommand,
) {
match command {
NodeEndpointCommand::Send {
remote,
payload,
queued_at,
response_tx,
} => {
crate::perf_profile::record_since(
crate::perf_profile::Stage::EndpointCommandWait,
queued_at,
);
let _t =
crate::perf_profile::Timer::start(crate::perf_profile::Stage::EndpointSend);
let result = self.send_endpoint_data(remote, payload).await;
let _ = response_tx.send(result);
}
NodeEndpointCommand::SendOneway {
remote,
payload,
queued_at,
} => {
crate::perf_profile::record_since(
crate::perf_profile::Stage::EndpointCommandWait,
queued_at,
);
let _t =
crate::perf_profile::Timer::start(crate::perf_profile::Stage::EndpointSend);
let _ = self.send_endpoint_data(remote, payload).await;
}
NodeEndpointCommand::PeerSnapshot { response_tx } => {
let peers = self
.peers()
.map(|peer| {
let link_id = peer.link_id();
let transport_type = self.get_link(&link_id).and_then(|link| {
self.get_transport(&link.transport_id())
.map(|handle| handle.transport_type().name.to_string())
});
let stats = peer.link_stats();
NodeEndpointPeer {
npub: peer.npub(),
transport_addr: peer.current_addr().map(|addr| addr.to_string()),
transport_type,
link_id: link_id.as_u64(),
srtt_ms: peer
.mmp()
.and_then(|mmp| mmp.metrics.srtt_ms())
.map(|srtt| srtt.round() as u64),
packets_sent: stats.packets_sent,
packets_recv: stats.packets_recv,
bytes_sent: stats.bytes_sent,
bytes_recv: stats.bytes_recv,
}
})
.collect();
let _ = response_tx.send(peers);
}
}
}
pub(in crate::node) async fn send_endpoint_data(
&mut self,
remote: crate::PeerIdentity,
payload: Vec<u8>,
) -> Result<(), NodeError> {
let dest_addr = *remote.node_addr();
if self
.sessions
.get(&dest_addr)
.is_some_and(|entry| entry.is_established())
{
return self.send_session_endpoint_data(&dest_addr, &payload).await;
}
let dest_pubkey = remote.pubkey_full();
self.register_identity(dest_addr, dest_pubkey);
self.send_or_queue_endpoint_data(dest_addr, Some(dest_pubkey), payload)
.await
}
async fn send_or_queue_endpoint_data(
&mut self,
dest_addr: NodeAddr,
dest_pubkey: Option<PublicKey>,
payload: Vec<u8>,
) -> Result<(), NodeError> {
if let Some(entry) = self.sessions.get(&dest_addr) {
if entry.is_established() {
return self.send_session_endpoint_data(&dest_addr, &payload).await;
}
self.queue_pending_endpoint_data(dest_addr, payload);
let should_discover = self.config.node.routing.mode
== crate::config::RoutingMode::ReplyLearned
|| self.find_next_hop(&dest_addr).is_none();
if should_discover {
self.maybe_initiate_lookup(&dest_addr).await;
}
return Ok(());
}
let dest_pubkey = dest_pubkey
.or_else(|| self.pubkey_for_node_addr(&dest_addr))
.ok_or_else(|| NodeError::SendFailed {
node_addr: dest_addr,
reason: "unknown remote identity for endpoint data".into(),
})?;
if self.find_next_hop(&dest_addr).is_none() {
self.queue_pending_endpoint_data(dest_addr, payload);
self.maybe_initiate_lookup(&dest_addr).await;
return Ok(());
}
match self.initiate_session(dest_addr, dest_pubkey).await {
Ok(()) => {}
Err(NodeError::SendFailed { node_addr, reason })
if node_addr == dest_addr && reason == "no route to destination" =>
{
self.queue_pending_endpoint_data(dest_addr, payload);
self.maybe_initiate_lookup(&dest_addr).await;
return Ok(());
}
Err(error) => return Err(error),
}
self.queue_pending_endpoint_data(dest_addr, payload);
Ok(())
}
async fn send_session_endpoint_data(
&mut self,
dest_addr: &NodeAddr,
payload: &[u8],
) -> Result<(), NodeError> {
if payload.len() > u16::MAX as usize - FSP_INNER_HEADER_SIZE {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "endpoint data payload too long".into(),
});
}
let now_ms = Self::now_ms();
let entry = self
.sessions
.get(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let wants_coords = entry.coords_warmup_remaining() > 0;
let timestamp = entry.session_timestamp(now_ms);
let spin_bit = entry.mmp().is_some_and(|m| m.spin_bit.tx_bit());
if !entry.is_established() {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
let msg_type = SessionMessageType::EndpointData.to_byte();
let inner_flags = FspInnerFlags { spin_bit }.to_byte();
let inner_plaintext = fsp_prepend_inner_header(timestamp, msg_type, inner_flags, payload);
let (include_coords, my_coords, dest_coords) = if wants_coords {
let src = self.tree_state.my_coords().clone();
let dst = self.get_dest_coords(dest_addr);
let coords_size = coords_wire_size(&src) + coords_wire_size(&dst);
let total_wire = FIPS_OVERHEAD as usize + coords_size + payload.len();
if total_wire <= self.transport_mtu() as usize {
(true, Some(src), Some(dst))
} else {
if let Err(e) = self.send_coords_warmup(dest_addr).await {
debug!(dest = %self.peer_display_name(dest_addr), error = %e,
"Failed to send standalone CoordsWarmup before endpoint data");
}
(false, None, None)
}
} else {
(false, None, None)
};
if wants_coords && let Some(entry) = self.sessions.get_mut(dest_addr) {
entry.set_coords_warmup_remaining(entry.coords_warmup_remaining() - 1);
}
let mut flags = if include_coords { FSP_FLAG_CP } else { 0 };
if let Some(entry) = self.sessions.get(dest_addr)
&& entry.current_k_bit()
{
flags |= FSP_FLAG_K;
}
if self
.try_send_session_endpoint_data_pipelined(PipelinedEndpointSend {
dest_addr,
payload,
now_ms,
timestamp,
fsp_flags: flags,
inner_plaintext: &inner_plaintext,
my_coords: my_coords.as_ref(),
dest_coords: dest_coords.as_ref(),
})
.await?
{
return Ok(());
}
let entry = self
.sessions
.get_mut(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let session = match entry.state_mut() {
EndToEndState::Established(s) => s,
_ => {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
};
let counter = session.current_send_counter();
let payload_len = inner_plaintext.len() as u16;
let header = build_fsp_header(counter, flags, payload_len);
let ciphertext = {
let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FspEncrypt);
session
.encrypt_with_aad(&inner_plaintext, &header)
.map_err(|e| NodeError::SendFailed {
node_addr: *dest_addr,
reason: format!("session encrypt failed: {}", e),
})?
};
let mut fsp_payload = Vec::with_capacity(FSP_HEADER_SIZE + ciphertext.len() + 200);
fsp_payload.extend_from_slice(&header);
if let (Some(src), Some(dst)) = (&my_coords, &dest_coords) {
encode_coords(src, &mut fsp_payload);
encode_coords(dst, &mut fsp_payload);
}
fsp_payload.extend_from_slice(&ciphertext);
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *dest_addr, fsp_payload)
.with_ttl(self.config.node.session.default_ttl);
self.send_session_datagram(&mut datagram).await?;
if let Some(entry) = self.sessions.get_mut(dest_addr) {
entry.record_sent(payload.len());
if let Some(mmp) = entry.mmp_mut() {
mmp.sender.record_sent(counter, timestamp, ciphertext.len());
}
entry.touch(now_ms);
}
Ok(())
}
#[cfg(unix)]
async fn try_send_session_endpoint_data_pipelined(
&mut self,
send: PipelinedEndpointSend<'_>,
) -> Result<bool, NodeError> {
let dest_addr = send.dest_addr;
let Some(workers) = self.encrypt_workers.as_ref().cloned() else {
return Ok(false);
};
let Some(next_hop_addr) = self.find_next_hop(dest_addr).map(|peer| *peer.node_addr())
else {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no route to destination".into(),
});
};
let mut path_mtu = u16::MAX;
if let Some(peer) = self.peers.get(&next_hop_addr)
&& let Some(tid) = peer.transport_id()
&& let Some(transport) = self.transports.get(&tid)
{
if let Some(addr) = peer.current_addr() {
path_mtu = path_mtu.min(transport.link_mtu(addr));
} else {
path_mtu = path_mtu.min(transport.mtu());
}
}
let (their_index, transport_id, remote_addr, timestamp_ms, fmp_flags, fmp_cipher) = {
let peer = self
.peers
.get_mut(&next_hop_addr)
.ok_or(NodeError::PeerNotFound(next_hop_addr))?;
let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
node_addr: next_hop_addr,
reason: "no their_index".into(),
})?;
let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
node_addr: next_hop_addr,
reason: "no transport_id".into(),
})?;
let remote_addr =
peer.current_addr()
.cloned()
.ok_or_else(|| NodeError::SendFailed {
node_addr: next_hop_addr,
reason: "no current_addr".into(),
})?;
let timestamp_ms = peer.session_elapsed_ms();
let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
let mut fmp_flags = if sp_flag { FLAG_SP } else { 0 };
if peer.current_k_bit() {
fmp_flags |= FLAG_KEY_EPOCH;
}
let session = peer
.noise_session_mut()
.ok_or_else(|| NodeError::SendFailed {
node_addr: next_hop_addr,
reason: "no noise session".into(),
})?;
let Some(fmp_cipher) = session.send_cipher_clone() else {
return Ok(false);
};
(
their_index,
transport_id,
remote_addr,
timestamp_ms,
fmp_flags,
fmp_cipher,
)
};
#[cfg(any(target_os = "linux", target_os = "macos"))]
let connected_socket = self
.peers
.get(&next_hop_addr)
.and_then(|peer| peer.connected_udp());
let transport = self
.transports
.get(&transport_id)
.ok_or(NodeError::TransportNotFound(transport_id))?;
let TransportHandle::Udp(udp) = transport else {
return Ok(false);
};
let socket_addr = {
#[cfg(any(target_os = "linux", target_os = "macos"))]
{
match connected_socket.as_ref() {
Some(socket) => Some(socket.peer_addr()),
None => udp.resolve_for_off_task(&remote_addr).await.ok(),
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
udp.resolve_for_off_task(&remote_addr).await.ok()
}
};
let Some(socket_addr) = socket_addr else {
return Ok(false);
};
let Some(socket) = udp.async_socket() else {
return Ok(false);
};
let (fsp_counter, fsp_cipher) = {
let entry = self
.sessions
.get_mut(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
if let Some(mmp) = entry.mmp_mut() {
mmp.path_mtu.seed_source_mtu(path_mtu);
}
let session = match entry.state_mut() {
EndToEndState::Established(s) => s,
_ => {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
};
let Some(fsp_cipher) = session.send_cipher_clone() else {
return Ok(false);
};
let counter = session
.take_send_counter()
.map_err(|e| NodeError::SendFailed {
node_addr: *dest_addr,
reason: format!("session counter reservation failed: {}", e),
})?;
(counter, fsp_cipher)
};
let fsp_header = build_fsp_header(
fsp_counter,
send.fsp_flags,
send.inner_plaintext.len() as u16,
);
let coords_size = match (send.my_coords, send.dest_coords) {
(Some(src), Some(dst)) => coords_wire_size(src) + coords_wire_size(dst),
_ => 0,
};
let link_plaintext_len = SESSION_DATAGRAM_HEADER_SIZE
+ FSP_HEADER_SIZE
+ coords_size
+ send.inner_plaintext.len();
let fmp_inner_len = 4 + link_plaintext_len + crate::noise::TAG_SIZE;
let fmp_counter = {
let peer = self
.peers
.get_mut(&next_hop_addr)
.ok_or(NodeError::PeerNotFound(next_hop_addr))?;
let session = peer
.noise_session_mut()
.ok_or_else(|| NodeError::SendFailed {
node_addr: next_hop_addr,
reason: "no noise session".into(),
})?;
session
.take_send_counter()
.map_err(|e| NodeError::SendFailed {
node_addr: next_hop_addr,
reason: format!("counter reservation failed: {}", e),
})?
};
let fmp_header =
build_established_header(their_index, fmp_counter, fmp_flags, fmp_inner_len as u16);
let wire_capacity = ESTABLISHED_HEADER_SIZE + fmp_inner_len + crate::noise::TAG_SIZE;
let mut wire_buf = Vec::with_capacity(wire_capacity);
wire_buf.extend_from_slice(&fmp_header);
wire_buf.extend_from_slice(×tamp_ms.to_le_bytes());
wire_buf.push(LinkMessageType::SessionDatagram.to_byte());
wire_buf.push(self.config.node.session.default_ttl);
wire_buf.extend_from_slice(&path_mtu.to_le_bytes());
wire_buf.extend_from_slice(self.node_addr().as_bytes());
wire_buf.extend_from_slice(dest_addr.as_bytes());
let fsp_aad_offset = wire_buf.len();
wire_buf.extend_from_slice(&fsp_header);
if let (Some(src), Some(dst)) = (send.my_coords, send.dest_coords) {
encode_coords(src, &mut wire_buf);
encode_coords(dst, &mut wire_buf);
}
let fsp_plaintext_offset = wire_buf.len();
wire_buf.extend_from_slice(send.inner_plaintext);
let predicted_bytes = wire_capacity;
if let Some(peer) = self.peers.get_mut(&next_hop_addr) {
peer.link_stats_mut().record_sent(predicted_bytes);
if let Some(mmp) = peer.mmp_mut() {
mmp.sender
.record_sent(fmp_counter, timestamp_ms, predicted_bytes);
}
}
self.stats_mut()
.forwarding
.record_originated(link_plaintext_len + crate::noise::TAG_SIZE);
if let Some(entry) = self.sessions.get_mut(dest_addr) {
entry.record_sent(send.payload.len());
if let Some(mmp) = entry.mmp_mut() {
mmp.sender.record_sent(
fsp_counter,
send.timestamp,
send.inner_plaintext.len() + crate::noise::TAG_SIZE,
);
}
entry.touch(send.now_ms);
}
workers.dispatch(crate::node::encrypt_worker::FmpSendJob {
cipher: fmp_cipher,
counter: fmp_counter,
wire_buf,
fsp_seal: Some(crate::node::encrypt_worker::FspSealJob {
cipher: fsp_cipher,
counter: fsp_counter,
aad_offset: fsp_aad_offset,
plaintext_offset: fsp_plaintext_offset,
}),
socket,
dest_addr: socket_addr,
#[cfg(any(target_os = "linux", target_os = "macos"))]
connected_socket,
drop_on_backpressure: true,
queued_at: crate::perf_profile::stamp(),
});
Ok(true)
}
#[cfg(not(unix))]
async fn try_send_session_endpoint_data_pipelined(
&mut self,
_send: PipelinedEndpointSend<'_>,
) -> Result<bool, NodeError> {
Ok(false)
}
fn deliver_endpoint_data(&self, src_addr: &NodeAddr, payload: Vec<u8>) {
let Some(endpoint_event_tx) = &self.endpoint_event_tx else {
trace!(
src = %self.peer_display_name(src_addr),
"Endpoint data received without an attached endpoint"
);
return;
};
let event = NodeEndpointEvent::Data {
source_node_addr: *src_addr,
source_npub: self.npub_for_node_addr(src_addr),
payload,
queued_at: crate::perf_profile::stamp(),
};
let _t_deliver =
crate::perf_profile::Timer::start(crate::perf_profile::Stage::EndpointDeliver);
if let Err(error) = endpoint_event_tx.send(event) {
debug!(
src = %self.peer_display_name(src_addr),
error = %error,
"Failed to deliver endpoint data event"
);
}
}
pub(in crate::node) async fn send_session_msg(
&mut self,
dest_addr: &NodeAddr,
msg_type: u8,
payload: &[u8],
) -> Result<(), NodeError> {
let now_ms = Self::now_ms();
let entry = self
.sessions
.get(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let timestamp = entry.session_timestamp(now_ms);
let spin_bit = entry.mmp().is_some_and(|m| m.spin_bit.tx_bit());
let inner_flags = FspInnerFlags { spin_bit }.to_byte();
let entry = self
.sessions
.get_mut(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let k_flags = if entry.current_k_bit() { FSP_FLAG_K } else { 0 };
let session = match entry.state_mut() {
EndToEndState::Established(s) => s,
_ => {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
};
let counter = session.current_send_counter();
let inner_plaintext = fsp_prepend_inner_header(timestamp, msg_type, inner_flags, payload);
let payload_len = inner_plaintext.len() as u16;
let header = build_fsp_header(counter, k_flags, payload_len);
let ciphertext = session
.encrypt_with_aad(&inner_plaintext, &header)
.map_err(|e| NodeError::SendFailed {
node_addr: *dest_addr,
reason: format!("session encrypt failed: {}", e),
})?;
let mut fsp_payload = Vec::with_capacity(FSP_HEADER_SIZE + ciphertext.len());
fsp_payload.extend_from_slice(&header);
fsp_payload.extend_from_slice(&ciphertext);
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *dest_addr, fsp_payload)
.with_ttl(self.config.node.session.default_ttl);
self.send_session_datagram(&mut datagram).await?;
if let Some(entry) = self.sessions.get_mut(dest_addr)
&& let Some(mmp) = entry.mmp_mut()
{
mmp.sender.record_sent(counter, timestamp, ciphertext.len());
}
Ok(())
}
async fn send_coords_warmup(&mut self, dest_addr: &NodeAddr) -> Result<(), NodeError> {
let now_ms = Self::now_ms();
let my_coords = self.tree_state.my_coords().clone();
let dest_coords = self.get_dest_coords(dest_addr);
let entry = self
.sessions
.get(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let timestamp = entry.session_timestamp(now_ms);
let spin_bit = entry.mmp().is_some_and(|m| m.spin_bit.tx_bit());
let entry = self
.sessions
.get_mut(dest_addr)
.ok_or_else(|| NodeError::SendFailed {
node_addr: *dest_addr,
reason: "no session".into(),
})?;
let session = match entry.state_mut() {
EndToEndState::Established(s) => s,
_ => {
return Err(NodeError::SendFailed {
node_addr: *dest_addr,
reason: "session not established".into(),
});
}
};
let counter = session.current_send_counter();
let msg_type = SessionMessageType::CoordsWarmup.to_byte();
let inner_flags = FspInnerFlags { spin_bit }.to_byte();
let inner_plaintext = fsp_prepend_inner_header(timestamp, msg_type, inner_flags, &[]);
let payload_len = inner_plaintext.len() as u16;
let header = build_fsp_header(counter, FSP_FLAG_CP, payload_len);
let ciphertext = session
.encrypt_with_aad(&inner_plaintext, &header)
.map_err(|e| NodeError::SendFailed {
node_addr: *dest_addr,
reason: format!("session encrypt failed: {}", e),
})?;
let coords_size = coords_wire_size(&my_coords) + coords_wire_size(&dest_coords);
let mut fsp_payload = Vec::with_capacity(FSP_HEADER_SIZE + coords_size + ciphertext.len());
fsp_payload.extend_from_slice(&header);
encode_coords(&my_coords, &mut fsp_payload);
encode_coords(&dest_coords, &mut fsp_payload);
fsp_payload.extend_from_slice(&ciphertext);
let my_addr = *self.node_addr();
let mut datagram = SessionDatagram::new(my_addr, *dest_addr, fsp_payload)
.with_ttl(self.config.node.session.default_ttl);
self.send_session_datagram(&mut datagram).await?;
if let Some(entry) = self.sessions.get_mut(dest_addr)
&& let Some(mmp) = entry.mmp_mut()
{
mmp.sender.record_sent(counter, timestamp, ciphertext.len());
}
debug!(dest = %self.peer_display_name(dest_addr), "Sent standalone CoordsWarmup");
Ok(())
}
pub(in crate::node) async fn send_session_datagram(
&mut self,
datagram: &mut SessionDatagram,
) -> Result<(), NodeError> {
let next_hop_addr = match self.find_next_hop(&datagram.dest_addr) {
Some(peer) => *peer.node_addr(),
None => {
return Err(NodeError::SendFailed {
node_addr: datagram.dest_addr,
reason: "no route to destination".into(),
});
}
};
if let Some(peer) = self.peers.get(&next_hop_addr)
&& let Some(tid) = peer.transport_id()
&& let Some(transport) = self.transports.get(&tid)
{
if let Some(addr) = peer.current_addr() {
datagram.path_mtu = datagram.path_mtu.min(transport.link_mtu(addr));
} else {
datagram.path_mtu = datagram.path_mtu.min(transport.mtu());
}
}
if let Some(entry) = self.sessions.get_mut(&datagram.dest_addr)
&& let Some(mmp) = entry.mmp_mut()
{
mmp.path_mtu.seed_source_mtu(datagram.path_mtu);
}
let encoded = datagram.encode();
if let Err(err) = self
.send_encrypted_link_message(&next_hop_addr, &encoded)
.await
{
self.record_route_failure(datagram.dest_addr, next_hop_addr);
return Err(err);
}
self.stats_mut().forwarding.record_originated(encoded.len());
Ok(())
}
pub(in crate::node) fn get_dest_coords(&self, dest: &NodeAddr) -> crate::tree::TreeCoordinate {
let now_ms = Self::now_ms();
if let Some(coords) = self.coord_cache.get(dest, now_ms) {
return coords.clone();
}
self.tree_state.my_coords().clone()
}
pub(in crate::node) fn now_ms() -> u64 {
crate::time::now_ms()
}
pub(in crate::node) async fn handle_tun_outbound(&mut self, ipv6_packet: Vec<u8>) {
if ipv6_packet.len() < 40 || ipv6_packet[0] >> 4 != 6 {
return;
}
let effective_mtu = self.effective_ipv6_mtu() as usize;
if ipv6_packet.len() > effective_mtu {
self.send_icmpv6_packet_too_big(&ipv6_packet, effective_mtu as u32);
return;
}
let mut prefix = [0u8; 15];
prefix.copy_from_slice(&ipv6_packet[25..40]);
let (dest_addr, dest_pubkey) = match self.lookup_by_fips_prefix(&prefix) {
Some((addr, pk)) => (addr, pk),
None => {
self.send_icmpv6_dest_unreachable(&ipv6_packet);
return;
}
};
if let Some(entry) = self.sessions.get(&dest_addr) {
if entry.is_established() {
if let Some(mmp) = entry.mmp() {
let path_mtu = mmp.path_mtu.current_mtu();
let path_ipv6_mtu = crate::upper::icmp::effective_ipv6_mtu(path_mtu) as usize;
if path_ipv6_mtu < effective_mtu && ipv6_packet.len() > path_ipv6_mtu {
self.send_icmpv6_packet_too_big(&ipv6_packet, path_ipv6_mtu as u32);
return;
}
}
if let Err(e) = self.send_ipv6_packet(&dest_addr, &ipv6_packet).await {
debug!(dest = %self.peer_display_name(&dest_addr), error = %e, "Failed to send TUN packet via session");
}
return;
}
self.queue_pending_packet(dest_addr, ipv6_packet);
let should_discover = self.config.node.routing.mode
== crate::config::RoutingMode::ReplyLearned
|| self.find_next_hop(&dest_addr).is_none();
if should_discover {
self.maybe_initiate_lookup(&dest_addr).await;
}
return;
}
if let Err(e) = self.initiate_session(dest_addr, dest_pubkey).await {
debug!(dest = %self.peer_display_name(&dest_addr), error = %e, "Failed to initiate session, trying discovery");
self.maybe_initiate_lookup(&dest_addr).await;
self.queue_pending_packet(dest_addr, ipv6_packet);
return;
}
self.queue_pending_packet(dest_addr, ipv6_packet);
}
pub(in crate::node) fn send_icmpv6_dest_unreachable(&self, original_packet: &[u8]) {
use crate::FipsAddress;
use crate::upper::icmp::{
DestUnreachableCode, build_dest_unreachable, should_send_icmp_error,
};
if !should_send_icmp_error(original_packet) {
return;
}
let our_ipv6 = FipsAddress::from_node_addr(self.node_addr()).to_ipv6();
if let Some(response) =
build_dest_unreachable(original_packet, DestUnreachableCode::NoRoute, our_ipv6)
&& let Some(tun_tx) = &self.tun_tx
{
let _ = tun_tx.send(response);
}
}
pub(in crate::node) fn send_icmpv6_packet_too_big(&mut self, original_packet: &[u8], mtu: u32) {
use crate::upper::icmp::build_packet_too_big;
use std::net::Ipv6Addr;
if original_packet.len() < 40 {
return;
}
let src_addr = Ipv6Addr::from(<[u8; 16]>::try_from(&original_packet[8..24]).unwrap());
if !self.icmp_rate_limiter.should_send(src_addr) {
debug!(
src = %src_addr,
"Rate limiting ICMP Packet Too Big"
);
return;
}
let dest_addr = Ipv6Addr::from(<[u8; 16]>::try_from(&original_packet[24..40]).unwrap());
if let Some(response) = build_packet_too_big(original_packet, mtu, dest_addr)
&& let Some(tun_tx) = &self.tun_tx
{
debug!(
original_src = %src_addr,
original_dst = %dest_addr,
packet_size = original_packet.len(),
reported_mtu = mtu,
"Sending ICMP Packet Too Big"
);
let _ = tun_tx.send(response);
}
}
fn queue_pending_packet(&mut self, dest_addr: NodeAddr, packet: Vec<u8>) {
let max_dests = self.config.node.session.pending_max_destinations;
if !self.pending_tun_packets.contains_key(&dest_addr)
&& self.pending_tun_packets.len() >= max_dests
{
return;
}
let queue = self.pending_tun_packets.entry(dest_addr).or_default();
if queue.len() >= self.config.node.session.pending_packets_per_dest {
queue.pop_front(); }
queue.push_back(packet);
}
fn queue_pending_endpoint_data(&mut self, dest_addr: NodeAddr, payload: Vec<u8>) {
let max_dests = self.config.node.session.pending_max_destinations;
if !self.pending_endpoint_data.contains_key(&dest_addr)
&& self.pending_endpoint_data.len() >= max_dests
{
return;
}
let queue = self.pending_endpoint_data.entry(dest_addr).or_default();
if queue.len() >= self.config.node.session.pending_packets_per_dest {
queue.pop_front();
}
queue.push_back(payload);
}
async fn flush_pending_packets(&mut self, dest_addr: &NodeAddr) {
if let Some(packets) = self.pending_tun_packets.remove(dest_addr) {
for packet in packets {
if let Err(e) = self.send_ipv6_packet(dest_addr, &packet).await {
debug!(dest = %self.peer_display_name(dest_addr), error = %e, "Failed to send queued TUN packet");
break;
}
}
}
if let Some(payloads) = self.pending_endpoint_data.remove(dest_addr) {
for payload in payloads {
if let Err(e) = self.send_session_endpoint_data(dest_addr, &payload).await {
debug!(dest = %self.peer_display_name(dest_addr), error = %e, "Failed to send queued endpoint data");
break;
}
}
}
}
pub(in crate::node) async fn retry_session_after_discovery(&mut self, dest_addr: NodeAddr) {
let mut prefix = [0u8; 15];
prefix.copy_from_slice(&dest_addr.as_bytes()[0..15]);
let dest_pubkey = match self.lookup_by_fips_prefix(&prefix) {
Some((_, pk)) => pk,
None => {
debug!(dest = %self.peer_display_name(&dest_addr), "Discovery complete but no identity for session retry");
return;
}
};
if let Some(existing) = self.sessions.get(&dest_addr)
&& (existing.is_established() || existing.is_initiating())
{
return;
}
match self.initiate_session(dest_addr, dest_pubkey).await {
Ok(()) => {
debug!(dest = %self.peer_display_name(&dest_addr), "Session initiated after discovery");
}
Err(e) => {
debug!(dest = %self.peer_display_name(&dest_addr), error = %e, "Session retry after discovery failed");
}
}
}
}
pub(in crate::node) fn mark_ipv6_ecn_ce(packet: &mut [u8]) {
if packet.len() < 2 {
return;
}
let tc = ((packet[0] & 0x0F) << 4) | (packet[1] >> 4);
let ecn = tc & 0x03;
if ecn == 0 {
return;
}
let new_tc = tc | 0x03;
packet[0] = (packet[0] & 0xF0) | (new_tc >> 4);
packet[1] = (new_tc << 4) | (packet[1] & 0x0F);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Identity;
use crate::noise::{NoiseError, NoiseSession};
fn node_addr(byte: u8) -> NodeAddr {
let mut bytes = [0u8; 16];
bytes[0] = byte;
NodeAddr::from_bytes(bytes)
}
fn make_xk_session_pair(
initiator: &Identity,
responder: &Identity,
) -> (NoiseSession, NoiseSession) {
let mut initiator_hs =
HandshakeState::new_xk_initiator(initiator.keypair(), responder.pubkey_full());
let mut responder_hs = HandshakeState::new_xk_responder(responder.keypair());
initiator_hs.set_local_epoch([1u8; 8]);
responder_hs.set_local_epoch([2u8; 8]);
let msg1 = initiator_hs.write_xk_message_1().unwrap();
responder_hs.read_xk_message_1(&msg1).unwrap();
let msg2 = responder_hs.write_xk_message_2().unwrap();
initiator_hs.read_xk_message_2(&msg2).unwrap();
let msg3 = initiator_hs.write_xk_message_3().unwrap();
responder_hs.read_xk_message_3(&msg3).unwrap();
(
initiator_hs.into_session().unwrap(),
responder_hs.into_session().unwrap(),
)
}
fn make_xk_session(initiator: &Identity, responder: &Identity) -> NoiseSession {
make_xk_session_pair(initiator, responder).0
}
fn encrypt_frame(session: &mut NoiseSession, plaintext: &[u8], aad: &[u8]) -> (u64, Vec<u8>) {
let counter = session.current_send_counter();
let ciphertext = session.encrypt_with_aad(plaintext, aad).unwrap();
(counter, ciphertext)
}
fn decrypt_current(
entry: &mut SessionEntry,
ciphertext: &[u8],
counter: u64,
aad: &[u8],
) -> Result<Vec<u8>, NoiseError> {
match entry.state_mut() {
EndToEndState::Established(session) => {
session.decrypt_with_replay_check_and_aad(ciphertext, counter, aad)
}
_ => unreachable!("test entry is established"),
}
}
fn established_entry(local: &Identity, peer: &Identity) -> SessionEntry {
let session = make_xk_session(local, peer);
SessionEntry::new(
*peer.node_addr(),
peer.pubkey_full(),
EndToEndState::Established(session),
1000,
true,
)
}
#[test]
fn pending_rekey_tiebreak_keeps_local_initiator_only_when_smaller() {
let local = Identity::generate();
let peer = Identity::generate();
let mut entry = established_entry(&local, &peer);
let rekey = HandshakeState::new_xk_initiator(local.keypair(), peer.pubkey_full());
entry.set_rekey_state(rekey, true);
entry.set_pending_session(make_xk_session(&local, &peer));
assert!(pending_rekey_wins_tiebreak(
&node_addr(0x01),
&node_addr(0x02),
&entry
));
assert!(!pending_rekey_wins_tiebreak(
&node_addr(0x02),
&node_addr(0x01),
&entry
));
}
#[test]
fn pending_rekey_tiebreak_does_not_keep_responder_pending() {
let local = Identity::generate();
let peer = Identity::generate();
let mut entry = established_entry(&local, &peer);
let rekey = HandshakeState::new_xk_responder(local.keypair());
entry.set_rekey_state(rekey, false);
entry.set_pending_session(make_xk_session(&peer, &local));
assert!(!pending_rekey_wins_tiebreak(
&node_addr(0x01),
&node_addr(0x02),
&entry
));
}
#[test]
fn decrypt_failure_recovery_rekey_requires_threshold_and_no_pending_rekey() {
let local = Identity::generate();
let peer = Identity::generate();
let mut entry = established_entry(&local, &peer);
assert!(!should_start_decrypt_failure_rekey(
&entry,
DECRYPT_FAILURE_RECOVERY_THRESHOLD - 1
));
assert!(should_start_decrypt_failure_rekey(
&entry,
DECRYPT_FAILURE_RECOVERY_THRESHOLD
));
let rekey = HandshakeState::new_xk_initiator(local.keypair(), peer.pubkey_full());
entry.set_rekey_state(rekey, true);
assert!(!should_start_decrypt_failure_rekey(
&entry,
DECRYPT_FAILURE_RECOVERY_THRESHOLD
));
entry.abandon_rekey();
entry.set_pending_session(make_xk_session(&local, &peer));
assert!(!should_start_decrypt_failure_rekey(
&entry,
DECRYPT_FAILURE_RECOVERY_THRESHOLD
));
}
#[test]
fn recovery_rekey_keeps_old_session_usable_until_and_after_cutover() {
let local = Identity::generate();
let peer = Identity::generate();
let aad = b"fsp-test-aad";
let (mut old_sender, old_receiver) = make_xk_session_pair(&peer, &local);
let (mut new_sender, new_receiver) = make_xk_session_pair(&peer, &local);
let mut entry = SessionEntry::new(
*peer.node_addr(),
peer.pubkey_full(),
EndToEndState::Established(old_receiver),
1000,
false,
);
let rekey = HandshakeState::new_xk_initiator(local.keypair(), peer.pubkey_full());
entry.set_rekey_state(rekey, true);
let (counter, ciphertext) =
encrypt_frame(&mut old_sender, b"old packet while rekey pending", aad);
assert_eq!(
decrypt_current(&mut entry, &ciphertext, counter, aad).unwrap(),
b"old packet while rekey pending"
);
entry.set_pending_session(new_receiver);
let (counter, ciphertext) =
encrypt_frame(&mut old_sender, b"old packet before cutover", aad);
assert_eq!(
decrypt_current(&mut entry, &ciphertext, counter, aad).unwrap(),
b"old packet before cutover"
);
assert!(entry.cutover_to_new_session(2000));
let (old_counter, old_ciphertext) =
encrypt_frame(&mut old_sender, b"old packet after cutover", aad);
assert!(decrypt_current(&mut entry, &old_ciphertext, old_counter, aad).is_err());
assert_eq!(
entry
.previous_noise_session_mut()
.expect("old session should be retained for drain")
.decrypt_with_replay_check_and_aad(&old_ciphertext, old_counter, aad)
.unwrap(),
b"old packet after cutover"
);
let (new_counter, new_ciphertext) =
encrypt_frame(&mut new_sender, b"new packet after cutover", aad);
assert_eq!(
decrypt_current(&mut entry, &new_ciphertext, new_counter, aad).unwrap(),
b"new packet after cutover"
);
}
}