use crate::NodeAddr;
use crate::node::Node;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use crate::transport::TransportHandle;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
#[cfg(any(target_os = "linux", target_os = "macos"))]
use tracing::{debug, info, warn};
impl Node {
pub(in crate::node) async fn activate_connected_udp_sessions(&mut self) {
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
{
if !connected_udp_enabled() {
return;
}
let candidates: Vec<NodeAddr> = self
.peers
.iter()
.filter_map(|(addr, peer)| {
let has_session = peer.noise_session().is_some();
let has_transport = peer.transport_id().is_some();
let has_addr = peer.current_addr().is_some();
let already_active = peer.connected_udp().is_some();
if has_session && has_transport && has_addr && !already_active {
Some(*addr)
} else {
None
}
})
.collect();
for addr in candidates {
if let Err(e) = self.activate_connected_udp_for_peer(&addr).await {
static FAILURES: AtomicU64 = AtomicU64::new(0);
crate::perf_profile::record_event(
crate::perf_profile::Event::ConnectedUdpActivationFailed,
);
let n = FAILURES.fetch_add(1, Relaxed);
if n < 8 || n.is_multiple_of(1000) {
warn!(peer = %addr, error = %e, failures = n + 1, "connected UDP activation deferred");
} else {
debug!(peer = %addr, error = %e, "connected UDP activation deferred");
}
}
}
}
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
async fn activate_connected_udp_for_peer(
&mut self,
node_addr: &NodeAddr,
) -> Result<(), String> {
let (transport_id, peer_transport_addr) = {
let Some(peer) = self.peers.get(node_addr) else {
return Ok(());
};
if peer.connected_udp().is_some() {
return Ok(()); }
let Some(tid) = peer.transport_id() else {
return Ok(());
};
let Some(addr) = peer.current_addr().cloned() else {
return Ok(());
};
(tid, addr)
};
let (peer_socket_addr, local_addr, recv_buf, send_buf, packet_tx) = {
let Some(transport) = self.transports.get(&transport_id) else {
return Ok(());
};
let udp = match transport {
TransportHandle::Udp(u) => u,
_ => return Ok(()), };
let peer_sa = udp
.resolve_for_off_task(&peer_transport_addr)
.await
.map_err(|e| format!("address resolve: {e}"))?;
let local = udp
.local_addr()
.ok_or_else(|| "udp transport not started".to_string())?;
let recv_buf = udp.recv_buf_size();
let send_buf = udp.send_buf_size();
let tx = udp.clone_packet_tx();
(peer_sa, local, recv_buf, send_buf, tx)
};
let socket = std::sync::Arc::new(
crate::transport::udp::connected_peer::ConnectedPeerSocket::open(
local_addr,
peer_socket_addr,
recv_buf,
send_buf,
)
.map_err(|e| format!("ConnectedPeerSocket::open: {e}"))?,
);
let drain = crate::transport::udp::peer_drain::PeerRecvDrain::spawn(
socket.clone(),
transport_id,
peer_socket_addr,
packet_tx,
)
.map_err(|e| format!("PeerRecvDrain::spawn: {e}"))?;
if let Some(peer) = self.peers.get_mut(node_addr) {
if peer.connected_udp().is_some() {
drop(drain);
drop(socket);
return Ok(());
}
peer.set_connected_udp(socket, drain);
crate::perf_profile::record_event(crate::perf_profile::Event::ConnectedUdpInstalled);
info!(
peer = %self.peer_display_name(node_addr),
peer_addr = %peer_socket_addr,
"connected UDP socket installed"
);
} else {
drop(drain);
drop(socket);
}
Ok(())
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub(in crate::node) fn clear_connected_udp_for_peer(&mut self, node_addr: &NodeAddr) {
if let Some(peer) = self.peers.get_mut(node_addr)
&& peer.connected_udp().is_some()
{
peer.clear_connected_udp();
debug!(peer = %self.peer_display_name(node_addr), "connected UDP socket cleared");
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
pub(in crate::node) fn clear_connected_udp_for_peer(&mut self, _node_addr: &NodeAddr) {}
}
#[cfg(target_os = "linux")]
fn connected_udp_enabled() -> bool {
env_flag("FIPS_CONNECTED_UDP").unwrap_or(true)
}
#[cfg(target_os = "macos")]
fn connected_udp_enabled() -> bool {
env_flag("FIPS_MACOS_CONNECTED_UDP")
.or_else(|| env_flag("FIPS_CONNECTED_UDP"))
.unwrap_or(true)
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
fn env_flag(name: &str) -> Option<bool> {
let value = std::env::var(name).ok()?;
match value.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "yes" | "on" => Some(true),
"0" | "false" | "no" | "off" => Some(false),
_ => None,
}
}