use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock as ParkingRwLock;
use rand::RngCore;
use tokio::sync::{RwLock, broadcast, mpsc, oneshot};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::ack_frame::{
AckControlOutcome, ReceiveRejectReason, decode_ack_control, decode_ack_payload,
decode_probe_request, encode_ack_control, encode_ack_payload, encode_probe_request,
};
use crate::bootstrap_cache::{
BootstrapCache, BootstrapTokenStore, CachedPeer, PeerCapabilities, PeerSource,
};
use crate::bounded_pending_buffer::BoundedPendingBuffer;
use crate::connection_router::{ConnectionRouter, RouterConfig};
use crate::connection_strategy::{
ConnectionMethod, ConnectionStage, ConnectionStrategy, StrategyConfig,
};
use crate::constrained::ConnectionId as ConstrainedConnectionId;
use crate::constrained::EngineEvent;
use crate::coordinator_control::{clear_live_request, take_live_rejection};
use crate::crypto::raw_public_keys::key_utils::{
derive_peer_id_from_public_key, generate_ml_dsa_keypair,
};
use crate::happy_eyeballs::{self, HappyEyeballsConfig};
use crate::mdns::{MdnsPeerRecord, MdnsRuntimeEvent, MdnsSnapshot, spawn_mdns_runtime};
pub use crate::nat_traversal_api::TraversalPhase;
use crate::nat_traversal_api::{
NatTraversalEndpoint, NatTraversalError, NatTraversalEvent, PeerId, TraversalFailureReason,
};
use crate::peer_directory::{PeerDirectorySnapshot, PeerDiscoverySource};
use crate::port_mapping::{PortMappingEvent, PortMappingSnapshot, spawn_best_effort_port_mapping};
use crate::reachability::{ReachabilityScope, TraversalMethod, socket_addr_scope};
use crate::transport::{ProtocolEngine, TransportAddr, TransportRegistry};
use crate::unified_config::{AutoConnectPolicy, P2pConfig, TrustPolicy};
use crate::{ConnectionCloseReason, Side};
const EVENT_CHANNEL_CAPACITY: usize = 256;
const PEER_EVENT_CHANNEL_CAPACITY: usize = 256;
use crate::SHUTDOWN_DRAIN_TIMEOUT;
fn peer_id_from_transport_addr(addr: &TransportAddr) -> PeerId {
let mut hasher = DefaultHasher::new();
format!("{}", addr).hash(&mut hasher);
let hash = hasher.finish();
let mut id = [0u8; 32];
id[..8].copy_from_slice(&hash.to_le_bytes());
id[8..16].copy_from_slice(&hash.to_be_bytes());
PeerId(id)
}
#[derive(Debug, Clone, Default)]
struct PeerHintRecord {
addrs: Vec<SocketAddr>,
capabilities: PeerCapabilities,
}
#[derive(Debug)]
struct ReaderTaskHandle {
generation: u64,
cancel: CancellationToken,
abort_handle: tokio::task::AbortHandle,
}
#[derive(Debug, Clone, Copy)]
struct ReaderExitEvent {
peer_id: PeerId,
generation: u64,
conn_stable_id: usize,
}
impl PeerHintRecord {
fn merge(&mut self, addrs: Vec<SocketAddr>, capabilities: Option<PeerCapabilities>) {
for addr in addrs {
if !self.addrs.contains(&addr) {
self.addrs.push(addr);
}
}
if let Some(caps) = capabilities {
if caps.supports_relay {
self.capabilities.supports_relay = true;
}
if caps.supports_coordination {
self.capabilities.supports_coordination = true;
}
self.capabilities.protocols.extend(caps.protocols);
if caps.nat_type.is_some() {
self.capabilities.nat_type = caps.nat_type;
}
for addr in caps.external_addresses {
self.capabilities.record_external_address(addr);
}
}
}
}
fn direct_candidate_rank(addr: SocketAddr) -> (u8, u8) {
let scope_rank = match socket_addr_scope(addr) {
Some(ReachabilityScope::Global) => 3,
Some(ReachabilityScope::LocalNetwork) => 2,
Some(ReachabilityScope::Loopback) => 1,
None => 0,
};
let family_rank = if addr.is_ipv6() { 2 } else { 1 };
(scope_rank, family_rank)
}
fn prioritize_direct_candidate_addrs(addrs: &mut Vec<SocketAddr>) {
addrs.sort_by_key(|addr| std::cmp::Reverse(direct_candidate_rank(*addr)));
addrs.dedup();
}
fn drop_non_global_direct_candidates_when_global_present(addrs: &mut Vec<SocketAddr>) {
let has_global = addrs
.iter()
.any(|addr| socket_addr_scope(*addr) == Some(ReachabilityScope::Global));
if has_global {
addrs.retain(|addr| socket_addr_scope(*addr) == Some(ReachabilityScope::Global));
}
}
fn relay_target_rank(addr: SocketAddr) -> u8 {
match socket_addr_scope(addr) {
Some(ReachabilityScope::Global) => 3,
Some(ReachabilityScope::LocalNetwork) => 2,
Some(ReachabilityScope::Loopback) => 1,
None => 0,
}
}
fn prioritize_relay_target_addrs(addrs: &mut Vec<SocketAddr>) {
addrs.sort_by_key(|addr| std::cmp::Reverse(relay_target_rank(*addr)));
addrs.dedup();
}
fn extend_unique_socket_addrs(
addrs: &mut Vec<SocketAddr>,
incoming: impl IntoIterator<Item = SocketAddr>,
) {
for addr in incoming {
if !addrs.contains(&addr) {
addrs.push(addr);
}
}
}
async fn try_addrs_with_shared_stage_budget<T, E, F, Fut>(
addresses: &[SocketAddr],
family_name: &str,
stage_budget: Duration,
mut attempt: F,
) -> Option<T>
where
F: FnMut(SocketAddr) -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
if addresses.is_empty() {
debug!("{}: No addresses to try", family_name);
return None;
}
if stage_budget.is_zero() {
debug!("{}: zero direct-stage budget, skipping family", family_name);
return None;
}
debug!(
"Trying {} {} addresses within a shared {:?} budget",
addresses.len(),
family_name,
stage_budget
);
let stage_deadline = Instant::now() + stage_budget;
for (idx, addr) in addresses.iter().enumerate() {
let remaining_budget = stage_deadline.saturating_duration_since(Instant::now());
if remaining_budget.is_zero() {
debug!(
"{}: shared direct-stage budget {:?} exhausted before address {}",
family_name, stage_budget, addr
);
break;
}
debug!(
" {} attempt {}/{}: {} (remaining budget: {:?})",
family_name,
idx + 1,
addresses.len(),
addr,
remaining_budget
);
match timeout(remaining_budget, attempt(*addr)).await {
Ok(Ok(result)) => {
info!("✓ {} connection successful to {}", family_name, addr);
return Some(result);
}
Ok(Err(error)) => {
debug!(" {} to {} failed: {}", family_name, addr, error);
}
Err(_) => {
debug!(
" {} to {} timed out after consuming the remaining {:?} family budget",
family_name, addr, remaining_budget
);
break;
}
}
}
debug!("{}: All {} addresses failed", family_name, addresses.len());
None
}
fn cached_peer_avg_rtt(peer: &CachedPeer) -> Option<Duration> {
(peer.stats.avg_rtt_ms > 0).then(|| Duration::from_millis(u64::from(peer.stats.avg_rtt_ms)))
}
fn cached_peer_matches_strategy_addr(peer: &CachedPeer, addr: SocketAddr) -> bool {
peer.addresses.contains(&addr)
|| peer.preferred_addresses().contains(&addr)
|| peer.capabilities.external_addresses.contains(&addr)
|| peer
.capabilities
.reachable_addresses
.iter()
.any(|record| record.address == addr)
}
fn strategy_rtt_hint_from_cached_peers(
peers: &[CachedPeer],
addrs: &[SocketAddr],
) -> Option<Duration> {
peers
.iter()
.filter(|peer| {
addrs
.iter()
.copied()
.any(|addr| cached_peer_matches_strategy_addr(peer, addr))
})
.filter_map(cached_peer_avg_rtt)
.max()
}
fn select_preferred_relay_target_addr(
listener_addrs: &[SocketAddr],
reachable_addrs: &[SocketAddr],
external_addrs: &[SocketAddr],
fallback_ipv4: Option<SocketAddr>,
fallback_ipv6: Option<SocketAddr>,
) -> Option<SocketAddr> {
let mut ordered = Vec::new();
let mut listeners = listener_addrs.to_vec();
prioritize_relay_target_addrs(&mut listeners);
extend_unique_socket_addrs(&mut ordered, listeners);
let mut reachable = reachable_addrs.to_vec();
prioritize_relay_target_addrs(&mut reachable);
extend_unique_socket_addrs(&mut ordered, reachable);
let mut external = external_addrs.to_vec();
prioritize_relay_target_addrs(&mut external);
extend_unique_socket_addrs(&mut ordered, external);
ordered
.into_iter()
.next()
.or(fallback_ipv4)
.or(fallback_ipv6)
}
fn normalize_direct_path_unavailable_reason(
error: &NatTraversalError,
) -> DirectPathUnavailableReason {
match error {
NatTraversalError::NoCandidatesFound | NatTraversalError::CandidateDiscoveryFailed(_) => {
DirectPathUnavailableReason::NoCandidates
}
NatTraversalError::HolePunchingFailed
| NatTraversalError::PunchingFailed(_)
| NatTraversalError::ValidationFailed(_)
| NatTraversalError::ValidationTimeout
| NatTraversalError::NetworkError(_)
| NatTraversalError::Timeout
| NatTraversalError::ConnectionFailed(_)
| NatTraversalError::TraversalFailed(_) => DirectPathUnavailableReason::NatUnreachable,
_ => DirectPathUnavailableReason::Unknown,
}
}
fn normalize_direct_path_unavailable_reason_from_traversal_reason(
reason: &TraversalFailureReason,
) -> DirectPathUnavailableReason {
match reason {
TraversalFailureReason::DiscoveryExhausted => DirectPathUnavailableReason::NoCandidates,
TraversalFailureReason::CoordinatorUnavailable
| TraversalFailureReason::CoordinationRejected { .. }
| TraversalFailureReason::CoordinationExpired
| TraversalFailureReason::SynchronizationExpired
| TraversalFailureReason::PunchWindowMissed
| TraversalFailureReason::ValidationTimedOut
| TraversalFailureReason::ValidationFailed
| TraversalFailureReason::ConnectionFailed
| TraversalFailureReason::NetworkError(_)
| TraversalFailureReason::ShuttingDown => DirectPathUnavailableReason::NatUnreachable,
TraversalFailureReason::ProtocolViolation(_) => DirectPathUnavailableReason::Unknown,
}
}
fn is_terminal_direct_path_status(status: &DirectPathStatus) -> bool {
matches!(
status,
DirectPathStatus::BestEffortUnavailable { .. } | DirectPathStatus::Failed { .. }
)
}
fn terminal_direct_path_status_from_failure(
reason: &TraversalFailureReason,
fallback_available: bool,
) -> DirectPathStatus {
if fallback_available {
DirectPathStatus::BestEffortUnavailable {
reason: normalize_direct_path_unavailable_reason_from_traversal_reason(reason),
}
} else {
DirectPathStatus::Failed {
error: reason.to_string(),
}
}
}
fn publish_direct_path_status(
statuses: &ParkingRwLock<HashMap<PeerId, DirectPathStatus>>,
event_tx: &broadcast::Sender<P2pEvent>,
peer_id: PeerId,
status: DirectPathStatus,
) {
let should_emit = {
let mut statuses = statuses.write();
if statuses.get(&peer_id) == Some(&status) {
false
} else {
statuses.insert(peer_id, status.clone());
true
}
};
if should_emit {
if let Err(e) = event_tx.send(P2pEvent::DirectPathStatus { peer_id, status }) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_direct_path_status",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
}
}
fn peer_id_from_socket_addr(addr: SocketAddr) -> PeerId {
let mut hasher = DefaultHasher::new();
addr.hash(&mut hasher);
let hash = hasher.finish();
let mut id = [0u8; 32];
id[..8].copy_from_slice(&hash.to_le_bytes());
id[8..10].copy_from_slice(&addr.port().to_le_bytes());
PeerId(id)
}
pub struct P2pEndpoint {
inner: Arc<NatTraversalEndpoint>,
connected_peers: Arc<RwLock<HashMap<PeerId, PeerConnection>>>,
stats: Arc<RwLock<EndpointStats>>,
config: P2pConfig,
event_tx: broadcast::Sender<P2pEvent>,
peer_id: PeerId,
public_key: Vec<u8>,
shutdown: CancellationToken,
pending_data: Arc<RwLock<BoundedPendingBuffer>>,
pub bootstrap_cache: Arc<BootstrapCache>,
peer_hint_records: Arc<RwLock<HashMap<PeerId, PeerHintRecord>>>,
transport_registry: Arc<TransportRegistry>,
router: Arc<RwLock<ConnectionRouter>>,
constrained_connections: Arc<RwLock<HashMap<PeerId, ConstrainedConnectionId>>>,
constrained_peer_addrs: Arc<RwLock<HashMap<ConstrainedConnectionId, (PeerId, TransportAddr)>>>,
manual_known_peer_udp_addrs: Arc<RwLock<Vec<SocketAddr>>>,
port_mapping_state: Arc<ParkingRwLock<PortMappingSnapshot>>,
mdns_state: Arc<ParkingRwLock<MdnsSnapshot>>,
mdns_auto_connect_inflight: Arc<ParkingRwLock<HashSet<String>>>,
direct_path_statuses: Arc<ParkingRwLock<HashMap<PeerId, DirectPathStatus>>>,
data_tx: mpsc::Sender<(PeerId, Vec<u8>)>,
data_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(PeerId, Vec<u8>)>>>,
reader_exit_tx: mpsc::UnboundedSender<ReaderExitEvent>,
reader_exit_rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<ReaderExitEvent>>>,
reader_handles: Arc<RwLock<HashMap<PeerId, Vec<ReaderTaskHandle>>>>,
peer_activity: Arc<RwLock<HashMap<PeerId, PeerActivityRecord>>>,
ack_waiters: Arc<ParkingRwLock<HashMap<usize, AckWaiterMap>>>,
peer_event_tx: broadcast::Sender<(PeerId, PeerLifecycleEvent)>,
peer_event_channels: Arc<ParkingRwLock<HashMap<PeerId, broadcast::Sender<PeerLifecycleEvent>>>>,
peer_event_generations: Arc<ParkingRwLock<HashMap<PeerId, u64>>>,
pub(crate) coordinator_health: Arc<crate::coordinator_health::CoordinatorHealth>,
}
impl std::fmt::Debug for P2pEndpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("P2pEndpoint")
.field("peer_id", &self.peer_id)
.field("config", &self.config)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone)]
pub struct PeerConnection {
pub peer_id: PeerId,
pub remote_addr: TransportAddr,
pub traversal_method: TraversalMethod,
pub side: Side,
pub authenticated: bool,
pub connected_at: Instant,
pub last_activity: Instant,
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionMetrics {
pub bytes_sent: u64,
pub bytes_received: u64,
pub rtt: Option<Duration>,
pub packet_loss: f64,
pub last_activity: Option<Instant>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ConnectionHealth {
pub connected: bool,
pub generation: Option<u64>,
pub reader_task_active: Option<bool>,
pub last_received_at: Option<Instant>,
pub last_sent_at: Option<Instant>,
pub idle_for: Option<Duration>,
pub close_reason: Option<ConnectionCloseReason>,
}
#[derive(Debug, Clone, Copy, Default)]
struct ConnectionHealthObservation {
connected: bool,
generation: Option<u64>,
reader_task_active: Option<bool>,
last_received_at: Option<Instant>,
last_sent_at: Option<Instant>,
close_reason: Option<ConnectionCloseReason>,
}
impl ConnectionHealth {
fn from_observation(observation: ConnectionHealthObservation, now: Instant) -> Self {
let last_live_activity = match (observation.last_sent_at, observation.last_received_at) {
(Some(sent), Some(received)) => Some(sent.max(received)),
(Some(sent), None) => Some(sent),
(None, Some(received)) => Some(received),
(None, None) => None,
};
Self {
connected: observation.connected,
generation: observation.generation,
reader_task_active: observation.reader_task_active,
last_received_at: observation.last_received_at,
last_sent_at: observation.last_sent_at,
idle_for: observation
.connected
.then(|| last_live_activity.map(|instant| now.saturating_duration_since(instant)))
.flatten(),
close_reason: observation.close_reason,
}
}
}
#[derive(Debug, Clone, Copy, Default)]
struct PeerActivityRecord {
last_sent_at: Option<Instant>,
last_received_at: Option<Instant>,
}
#[derive(Debug, Clone, Copy)]
enum PeerActivityKind {
Sent,
Received,
}
#[derive(Debug)]
enum AckWaiterResult {
Accepted,
Rejected(ReceiveRejectReason),
Closed(ConnectionCloseReason),
}
type AckWaiterMap = HashMap<[u8; 16], oneshot::Sender<AckWaiterResult>>;
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeAssistSnapshot {
pub successful_coordinations: u32,
pub active_relay_sessions: usize,
pub relay_bytes_forwarded: u64,
}
#[derive(Debug, Clone)]
pub struct EndpointStats {
pub active_connections: usize,
pub successful_connections: u64,
pub failed_connections: u64,
pub nat_traversal_attempts: u64,
pub nat_traversal_successes: u64,
pub direct_connections: u64,
pub active_direct_incoming_connections: u64,
pub last_direct_loopback_at: Option<Instant>,
pub last_direct_local_at: Option<Instant>,
pub last_direct_global_at: Option<Instant>,
pub relayed_connections: u64,
pub total_bootstrap_nodes: usize,
pub connected_bootstrap_nodes: usize,
pub start_time: Instant,
pub average_coordination_time: Duration,
}
impl Default for EndpointStats {
fn default() -> Self {
Self {
active_connections: 0,
successful_connections: 0,
failed_connections: 0,
nat_traversal_attempts: 0,
nat_traversal_successes: 0,
direct_connections: 0,
active_direct_incoming_connections: 0,
last_direct_loopback_at: None,
last_direct_local_at: None,
last_direct_global_at: None,
relayed_connections: 0,
total_bootstrap_nodes: 0,
connected_bootstrap_nodes: 0,
start_time: Instant::now(),
average_coordination_time: Duration::ZERO,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PeerLifecycleEvent {
Established {
generation: u64,
},
Replaced {
old_generation: u64,
new_generation: u64,
},
Closing {
generation: u64,
reason: ConnectionCloseReason,
},
Closed {
generation: u64,
reason: ConnectionCloseReason,
},
ReaderExited {
generation: u64,
},
}
#[derive(Debug, Clone)]
pub enum P2pEvent {
PeerConnected {
peer_id: PeerId,
addr: TransportAddr,
side: Side,
traversal_method: TraversalMethod,
},
PeerDisconnected {
peer_id: PeerId,
reason: DisconnectReason,
},
NatTraversalProgress {
peer_id: PeerId,
phase: TraversalPhase,
},
ExternalAddressDiscovered {
addr: TransportAddr,
},
PeerAddressUpdated {
peer_addr: SocketAddr,
advertised_addr: SocketAddr,
},
RelayEstablished {
relay_addr: SocketAddr,
},
PortMappingEstablished {
external_addr: SocketAddr,
},
PortMappingRenewed {
external_addr: SocketAddr,
},
PortMappingAddressChanged {
previous_addr: SocketAddr,
external_addr: SocketAddr,
},
PortMappingFailed {
error: String,
},
PortMappingRemoved {
external_addr: Option<SocketAddr>,
},
MdnsServiceAdvertised {
service: String,
namespace: Option<String>,
instance_fullname: String,
},
MdnsPeerDiscovered {
peer: MdnsPeerRecord,
},
MdnsPeerUpdated {
peer: MdnsPeerRecord,
},
MdnsPeerRemoved {
peer: MdnsPeerRecord,
},
MdnsPeerEligible {
peer: MdnsPeerRecord,
},
MdnsPeerIneligible {
peer: MdnsPeerRecord,
reason: String,
},
MdnsPeerApprovalRequired {
peer: MdnsPeerRecord,
reason: String,
},
MdnsAutoConnectAttempted {
peer: MdnsPeerRecord,
addresses: Vec<SocketAddr>,
},
MdnsAutoConnectSucceeded {
peer: MdnsPeerRecord,
authenticated_peer_id: PeerId,
remote_addr: TransportAddr,
},
MdnsAutoConnectFailed {
peer: MdnsPeerRecord,
addresses: Vec<SocketAddr>,
error: String,
},
DirectPathStatus {
peer_id: PeerId,
status: DirectPathStatus,
},
BootstrapStatus {
connected: usize,
total: usize,
},
PeerAuthenticated {
peer_id: PeerId,
},
DataReceived {
peer_id: PeerId,
bytes: usize,
},
ConstrainedDataReceived {
remote_addr: TransportAddr,
connection_id: u16,
data: Vec<u8>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DirectPathStatus {
Pending,
Established {
remote_addr: SocketAddr,
},
BestEffortUnavailable {
reason: DirectPathUnavailableReason,
},
Failed {
error: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DirectPathUnavailableReason {
NoCandidates,
NatUnreachable,
RelayRequired,
Unknown,
}
#[derive(Debug, Clone)]
pub enum DisconnectReason {
Normal,
Timeout,
ProtocolError(String),
AuthenticationFailed,
ConnectionLost,
RemoteClosed,
}
fn close_reason_from_connection(
connection: &crate::high_level::Connection,
) -> Option<ConnectionCloseReason> {
connection
.close_reason()
.as_ref()
.map(ConnectionCloseReason::from_connection_error)
}
fn endpoint_error_from_connection_error(error: crate::ConnectionError) -> EndpointError {
EndpointError::ConnectionClosed {
reason: ConnectionCloseReason::from_connection_error(&error),
}
}
fn endpoint_error_from_write_error(error: crate::high_level::WriteError) -> EndpointError {
match error {
crate::high_level::WriteError::ConnectionLost(error) => {
endpoint_error_from_connection_error(error)
}
other => EndpointError::Connection(other.to_string()),
}
}
fn close_reason_for_disconnect(reason: &DisconnectReason) -> ConnectionCloseReason {
match reason {
DisconnectReason::Normal => ConnectionCloseReason::LifecycleCleanup,
DisconnectReason::Timeout => ConnectionCloseReason::TimedOut,
DisconnectReason::ProtocolError(_) => ConnectionCloseReason::LifecycleCleanup,
DisconnectReason::AuthenticationFailed => ConnectionCloseReason::Banned,
DisconnectReason::ConnectionLost => ConnectionCloseReason::ReaderExit,
DisconnectReason::RemoteClosed => ConnectionCloseReason::ConnectionClosed,
}
}
#[derive(Debug, thiserror::Error)]
pub enum EndpointError {
#[error("Configuration error: {0}")]
Config(String),
#[error("Connection error: {0}")]
Connection(String),
#[error("Connection closed: {reason}")]
ConnectionClosed {
reason: ConnectionCloseReason,
},
#[error("NAT traversal error: {0}")]
NatTraversal(#[from] NatTraversalError),
#[error("Authentication error: {0}")]
Authentication(String),
#[error("Operation timed out")]
Timeout,
#[error("Feature not supported by peer or transport")]
NotSupported,
#[error("Timed out waiting for remote receive acknowledgement")]
AckTimeout,
#[error("Timed out waiting for peer liveness probe response")]
ProbeTimeout,
#[error("Remote receive pipeline rejected payload: {reason}")]
ReceiveRejected {
reason: ReceiveRejectReason,
},
#[error("Peer not found: {0:?}")]
PeerNotFound(PeerId),
#[error("Already connected to peer: {0:?}")]
AlreadyConnected(PeerId),
#[error("Endpoint is shutting down")]
ShuttingDown,
#[error("All connection strategies failed: {0}")]
AllStrategiesFailed(String),
#[error("No target address provided")]
NoAddress,
}
#[derive(Debug)]
enum HolePunchAwaitError {
TraversalFailure(TraversalFailureReason),
Endpoint(EndpointError),
}
impl HolePunchAwaitError {
fn retry_reason(&self) -> Option<&TraversalFailureReason> {
match self {
Self::TraversalFailure(reason) => Some(reason),
Self::Endpoint(_) => None,
}
}
fn from_nat_traversal_error(error: NatTraversalError) -> Self {
match TraversalFailureReason::from_public_operation_error(&error) {
Some(reason) => Self::TraversalFailure(reason),
None => Self::Endpoint(EndpointError::NatTraversal(error)),
}
}
}
impl std::fmt::Display for HolePunchAwaitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TraversalFailure(reason) => write!(f, "{reason}"),
Self::Endpoint(error) => write!(f, "{error}"),
}
}
}
async fn do_cleanup_connection(
connected_peers: &RwLock<HashMap<PeerId, PeerConnection>>,
inner: &NatTraversalEndpoint,
reader_handles: &RwLock<HashMap<PeerId, Vec<ReaderTaskHandle>>>,
direct_path_statuses: &ParkingRwLock<HashMap<PeerId, DirectPathStatus>>,
stats: &RwLock<EndpointStats>,
event_tx: &broadcast::Sender<P2pEvent>,
peer_event_tx: &broadcast::Sender<(PeerId, PeerLifecycleEvent)>,
peer_event_channels: &ParkingRwLock<HashMap<PeerId, broadcast::Sender<PeerLifecycleEvent>>>,
peer_event_generations: &ParkingRwLock<HashMap<PeerId, u64>>,
ack_waiters: &ParkingRwLock<HashMap<usize, AckWaiterMap>>,
peer_id: &PeerId,
reason: DisconnectReason,
close_reason: ConnectionCloseReason,
) -> bool {
let lifecycle_snapshot = inner
.get_connection(peer_id)
.ok()
.flatten()
.and_then(|connection| {
inner.connection_snapshot_by_stable_id(peer_id, connection.stable_id())
});
if let Some(snapshot) = lifecycle_snapshot {
emit_peer_lifecycle_event(
peer_event_tx,
peer_event_channels,
*peer_id,
PeerLifecycleEvent::Closing {
generation: snapshot.generation,
reason: close_reason,
},
);
}
let _ = inner.remove_connection_with_reason(peer_id, close_reason);
direct_path_statuses.write().remove(peer_id);
if let Some(snapshot) = lifecycle_snapshot {
emit_peer_lifecycle_event(
peer_event_tx,
peer_event_channels,
*peer_id,
PeerLifecycleEvent::Closed {
generation: snapshot.generation,
reason: close_reason,
},
);
fail_ack_waiters_for_connection(ack_waiters, snapshot.stable_id, close_reason);
let _ = peer_event_generations;
}
if let Some(handles) = reader_handles.write().await.remove(peer_id) {
for handle in handles {
handle.cancel.cancel();
handle.abort_handle.abort();
}
}
let removed = remove_connected_peer(connected_peers, stats, event_tx, peer_id, reason).await;
if removed {
info!("Cleaned up connection for peer {:?}", peer_id);
}
removed
}
async fn record_connection_established(
stats: &RwLock<EndpointStats>,
event_tx: &broadcast::Sender<P2pEvent>,
peer_conn: &PeerConnection,
previous: Option<&PeerConnection>,
) {
let had_active_direct_incoming =
previous.is_some_and(|prev| prev.traversal_method.is_direct() && prev.side.is_server());
let has_active_direct_incoming =
peer_conn.traversal_method.is_direct() && peer_conn.side.is_server();
let should_emit = previous.is_none_or(|prev| {
prev.remote_addr != peer_conn.remote_addr
|| prev.traversal_method != peer_conn.traversal_method
|| prev.side != peer_conn.side
|| prev.authenticated != peer_conn.authenticated
});
{
let mut s = stats.write().await;
if previous.is_none() {
s.active_connections += 1;
s.successful_connections += 1;
}
if previous.is_none_or(|prev| prev.traversal_method != peer_conn.traversal_method) {
match peer_conn.traversal_method {
TraversalMethod::Direct => {
s.direct_connections += 1;
}
TraversalMethod::Relay => {
s.relayed_connections += 1;
}
TraversalMethod::HolePunch | TraversalMethod::PortPrediction => {}
}
}
if !had_active_direct_incoming && has_active_direct_incoming {
s.active_direct_incoming_connections += 1;
} else if had_active_direct_incoming && !has_active_direct_incoming {
s.active_direct_incoming_connections =
s.active_direct_incoming_connections.saturating_sub(1);
}
if has_active_direct_incoming {
if let Some(remote_addr) = peer_conn.remote_addr.as_socket_addr() {
let now = Instant::now();
match socket_addr_scope(remote_addr) {
Some(ReachabilityScope::Loopback) => {
s.last_direct_loopback_at = Some(now);
}
Some(ReachabilityScope::LocalNetwork) => {
s.last_direct_local_at = Some(now);
}
Some(ReachabilityScope::Global) => {
s.last_direct_global_at = Some(now);
}
None => {}
}
}
}
}
if should_emit {
if let Err(e) = event_tx.send(P2pEvent::PeerConnected {
peer_id: peer_conn.peer_id,
addr: peer_conn.remote_addr.clone(),
side: peer_conn.side,
traversal_method: peer_conn.traversal_method,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_peer_connected",
peer_id = ?peer_conn.peer_id,
error = %e,
"silent drop"
);
}
}
}
async fn remove_connected_peer(
connected_peers: &RwLock<HashMap<PeerId, PeerConnection>>,
stats: &RwLock<EndpointStats>,
event_tx: &broadcast::Sender<P2pEvent>,
peer_id: &PeerId,
reason: DisconnectReason,
) -> bool {
let removed = connected_peers.write().await.remove(peer_id);
if let Some(peer_conn) = removed {
{
let mut s = stats.write().await;
s.active_connections = s.active_connections.saturating_sub(1);
if peer_conn.traversal_method.is_direct() && peer_conn.side.is_server() {
s.active_direct_incoming_connections =
s.active_direct_incoming_connections.saturating_sub(1);
}
}
if let Err(e) = event_tx.send(P2pEvent::PeerDisconnected {
peer_id: *peer_id,
reason,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_peer_disconnected",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
true
} else {
false
}
}
async fn store_connected_peer(
connected_peers: &RwLock<HashMap<PeerId, PeerConnection>>,
stats: &RwLock<EndpointStats>,
event_tx: &broadcast::Sender<P2pEvent>,
peer_conn: PeerConnection,
) {
let previous = connected_peers
.write()
.await
.insert(peer_conn.peer_id, peer_conn.clone());
record_connection_established(stats, event_tx, &peer_conn, previous.as_ref()).await;
}
async fn note_peer_activity(
connected_peers: &RwLock<HashMap<PeerId, PeerConnection>>,
peer_activity: &RwLock<HashMap<PeerId, PeerActivityRecord>>,
peer_id: PeerId,
kind: PeerActivityKind,
at: Instant,
) {
if let Some(peer_conn) = connected_peers.write().await.get_mut(&peer_id) {
peer_conn.last_activity = at;
}
let mut activity = peer_activity.write().await;
let entry = activity.entry(peer_id).or_default();
match kind {
PeerActivityKind::Sent => entry.last_sent_at = Some(at),
PeerActivityKind::Received => entry.last_received_at = Some(at),
}
}
fn peer_event_sender(
peer_event_channels: &ParkingRwLock<HashMap<PeerId, broadcast::Sender<PeerLifecycleEvent>>>,
peer_id: PeerId,
) -> broadcast::Sender<PeerLifecycleEvent> {
if let Some(sender) = peer_event_channels.read().get(&peer_id).cloned() {
return sender;
}
let mut channels = peer_event_channels.write();
channels
.entry(peer_id)
.or_insert_with(|| broadcast::channel(PEER_EVENT_CHANNEL_CAPACITY).0)
.clone()
}
fn emit_peer_lifecycle_event(
peer_event_tx: &broadcast::Sender<(PeerId, PeerLifecycleEvent)>,
peer_event_channels: &ParkingRwLock<HashMap<PeerId, broadcast::Sender<PeerLifecycleEvent>>>,
peer_id: PeerId,
event: PeerLifecycleEvent,
) {
if let Err(e) = peer_event_tx.send((peer_id, event.clone())) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "peer_event_tx_lifecycle",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
if let Some(sender) = peer_event_channels.read().get(&peer_id).cloned() {
if let Err(e) = sender.send(event) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "per_peer_lifecycle_broadcast",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
}
}
fn register_ack_waiter(
ack_waiters: &ParkingRwLock<HashMap<usize, AckWaiterMap>>,
stable_id: usize,
tag: [u8; 16],
tx: oneshot::Sender<AckWaiterResult>,
) -> bool {
let mut waiters = ack_waiters.write();
let entry = waiters.entry(stable_id).or_default();
if entry.contains_key(&tag) {
return false;
}
entry.insert(tag, tx);
true
}
fn resolve_ack_waiter(
ack_waiters: &ParkingRwLock<HashMap<usize, AckWaiterMap>>,
stable_id: usize,
tag: [u8; 16],
result: AckWaiterResult,
) -> bool {
let tx = {
let mut waiters = ack_waiters.write();
let sender = waiters
.get_mut(&stable_id)
.and_then(|entry| entry.remove(&tag));
if waiters.get(&stable_id).is_some_and(HashMap::is_empty) {
waiters.remove(&stable_id);
}
sender
};
if let Some(tx) = tx {
if let Err(_dropped) = tx.send(result) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "ack_waiter_oneshot",
stable_id = stable_id,
"silent drop"
);
}
true
} else {
false
}
}
fn fail_ack_waiters_for_connection(
ack_waiters: &ParkingRwLock<HashMap<usize, AckWaiterMap>>,
stable_id: usize,
reason: ConnectionCloseReason,
) {
let waiters = ack_waiters.write().remove(&stable_id);
if let Some(waiters) = waiters {
for (_, tx) in waiters {
if let Err(_dropped) = tx.send(AckWaiterResult::Closed(reason)) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "ack_waiter_close_oneshot",
stable_id = stable_id,
"silent drop"
);
}
}
}
}
async fn bridge_nat_traversal_event(
stats: &RwLock<EndpointStats>,
event_tx: &broadcast::Sender<P2pEvent>,
direct_path_statuses: &ParkingRwLock<HashMap<PeerId, DirectPathStatus>>,
event: NatTraversalEvent,
) {
match event {
NatTraversalEvent::CoordinationRequested { .. } => {
stats.write().await.nat_traversal_attempts += 1;
}
NatTraversalEvent::ConnectionEstablished {
peer_id,
remote_address,
..
} => {
stats.write().await.nat_traversal_successes += 1;
publish_direct_path_status(
direct_path_statuses,
event_tx,
peer_id,
DirectPathStatus::Established {
remote_addr: remote_address,
},
);
}
NatTraversalEvent::TraversalTerminated {
peer_id,
reason,
fallback_available,
} => {
stats.write().await.failed_connections += 1;
let status = terminal_direct_path_status_from_failure(&reason, fallback_available);
publish_direct_path_status(direct_path_statuses, event_tx, peer_id, status);
if let Err(e) = event_tx.send(P2pEvent::NatTraversalProgress {
peer_id,
phase: TraversalPhase::Failed,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_nat_progress_failed",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
}
NatTraversalEvent::TraversalFailed {
peer_id,
error,
fallback_available,
} => {
let already_terminal = direct_path_statuses
.read()
.get(&peer_id)
.is_some_and(is_terminal_direct_path_status);
if already_terminal {
return;
}
stats.write().await.failed_connections += 1;
let status = if fallback_available {
DirectPathStatus::BestEffortUnavailable {
reason: normalize_direct_path_unavailable_reason(&error),
}
} else {
DirectPathStatus::Failed {
error: error.to_string(),
}
};
publish_direct_path_status(direct_path_statuses, event_tx, peer_id, status);
if let Err(e) = event_tx.send(P2pEvent::NatTraversalProgress {
peer_id,
phase: TraversalPhase::Failed,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_nat_progress_failed",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
}
NatTraversalEvent::PhaseTransition {
peer_id, to_phase, ..
} => {
if !matches!(to_phase, TraversalPhase::Connected | TraversalPhase::Failed) {
publish_direct_path_status(
direct_path_statuses,
event_tx,
peer_id,
DirectPathStatus::Pending,
);
}
if let Err(e) = event_tx.send(P2pEvent::NatTraversalProgress {
peer_id,
phase: to_phase,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_nat_progress_phase",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
}
NatTraversalEvent::ExternalAddressDiscovered { address, .. } => {
info!("External address discovered: {}", address);
if let Err(e) = event_tx.send(P2pEvent::ExternalAddressDiscovered {
addr: TransportAddr::Udp(address),
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_external_addr",
addr = ?address,
error = %e,
"silent drop"
);
}
}
_ => {}
}
}
impl P2pEndpoint {
pub async fn new(config: P2pConfig) -> Result<Self, EndpointError> {
let (public_key, secret_key) = match config.keypair.clone() {
Some(keypair) => keypair,
None => generate_ml_dsa_keypair().map_err(|e| {
EndpointError::Config(format!("Failed to generate ML-DSA-65 keypair: {e:?}"))
})?,
};
let peer_id = derive_peer_id_from_public_key(&public_key);
info!("Creating P2P endpoint with peer ID: {:?}", peer_id);
let public_key_bytes: Vec<u8> = public_key.as_bytes().to_vec();
let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
let event_tx_clone = event_tx.clone();
let stats = Arc::new(RwLock::new(EndpointStats {
total_bootstrap_nodes: config.known_peers.len(),
start_time: Instant::now(),
..Default::default()
}));
let stats_clone = Arc::clone(&stats);
let direct_path_statuses = Arc::new(ParkingRwLock::new(HashMap::new()));
let direct_path_statuses_clone = Arc::clone(&direct_path_statuses);
let event_callback = Box::new(move |event: NatTraversalEvent| {
let event_tx = event_tx_clone.clone();
let stats = stats_clone.clone();
let direct_path_statuses = direct_path_statuses_clone.clone();
tokio::spawn(async move {
bridge_nat_traversal_event(
stats.as_ref(),
&event_tx,
direct_path_statuses.as_ref(),
event,
)
.await;
});
});
let mut nat_config = config.to_nat_config_with_key(public_key.clone(), secret_key);
let bootstrap_cache = Arc::new(
BootstrapCache::open(config.bootstrap_cache.clone())
.await
.map_err(|e| {
EndpointError::Config(format!("Failed to open bootstrap cache: {}", e))
})?,
);
let token_store = Arc::new(BootstrapTokenStore::new(bootstrap_cache.clone()).await);
use crate::high_level::runtime::AsyncUdpSocket;
let requested_port = config
.bind_addr
.as_ref()
.and_then(|addr| addr.as_socket_addr())
.map(|addr| addr.port())
.unwrap_or(0);
let mut _dual_stack_ref: Option<
std::sync::Arc<crate::high_level::runtime::dual_stack::DualStackSocket>,
> = None;
let mut inner = match crate::transport::UdpTransport::bind_dual_stack_for_endpoint(
requested_port,
)
.await
{
Ok((transport, dual_socket)) => {
let (v4_addr, v6_addr) = dual_socket.local_addrs();
info!(
"Bound dual-socket: IPv4={}, IPv6={} (true dual-stack, separate sockets)",
v4_addr
.map(|a| a.to_string())
.unwrap_or_else(|| "none".into()),
v6_addr
.map(|a| a.to_string())
.unwrap_or_else(|| "none".into()),
);
let actual_bind_addr = dual_socket.local_addr().map_err(|e| {
EndpointError::Config(format!("Failed to get local address: {e}"))
})?;
let mut transport_registry = config.transport_registry.clone();
crate::transport::register_best_effort_transports(&mut transport_registry).await;
transport_registry.register(Arc::new(transport));
nat_config.transport_registry = Some(Arc::new(transport_registry));
nat_config.bind_addr = Some(actual_bind_addr);
if let Some(v4_addr) = v4_addr {
nat_config.additional_bind_addrs.push(v4_addr);
}
let abs_socket: std::sync::Arc<dyn AsyncUdpSocket> = dual_socket.clone();
_dual_stack_ref = Some(dual_socket);
NatTraversalEndpoint::new_with_abstract_socket(
nat_config,
Some(event_callback),
Some(token_store.clone()),
abs_socket,
)
.await
.map_err(|e| EndpointError::Config(e.to_string()))?
}
Err(e) => {
info!("Dual-socket failed ({e}), falling back to single-socket");
let dual_stack_default: std::net::SocketAddr = std::net::SocketAddr::new(
std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
requested_port,
);
let ipv4_fallback: std::net::SocketAddr = std::net::SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
requested_port,
);
let bind_addr = config
.bind_addr
.as_ref()
.and_then(|addr| addr.as_socket_addr())
.unwrap_or(dual_stack_default);
let (transport, quinn_socket) =
match crate::transport::UdpTransport::bind_for_quinn(bind_addr).await {
Ok(result) => result,
Err(e2) if bind_addr == dual_stack_default => {
info!("Single-socket dual-stack failed ({e2}), falling back to IPv4");
crate::transport::UdpTransport::bind_for_quinn(ipv4_fallback)
.await
.map_err(|e3| {
EndpointError::Config(format!(
"All socket binds failed (dual: {e}, v6: {e2}, v4: {e3})"
))
})?
}
Err(e2) => {
return Err(EndpointError::Config(format!(
"Failed to bind UDP socket: {e2}"
)));
}
};
let actual_bind_addr = quinn_socket.local_addr().map_err(|e2| {
EndpointError::Config(format!("Failed to get local address: {e2}"))
})?;
info!(
"Bound single socket at {} ({})",
actual_bind_addr,
if actual_bind_addr.is_ipv6() {
"dual-stack IPv4+IPv6"
} else {
"IPv4 only"
}
);
let mut transport_registry = config.transport_registry.clone();
crate::transport::register_best_effort_transports(&mut transport_registry).await;
transport_registry.register(Arc::new(transport));
nat_config.transport_registry = Some(Arc::new(transport_registry));
nat_config.bind_addr = Some(actual_bind_addr);
NatTraversalEndpoint::new_with_socket(
nat_config,
Some(event_callback),
Some(token_store.clone()),
Some(quinn_socket),
)
.await
.map_err(|e2| EndpointError::Config(e2.to_string()))?
}
};
inner.set_local_peer_id(peer_id);
let transport_registry = inner
.transport_registry()
.cloned()
.unwrap_or_else(|| Arc::new(crate::transport::TransportRegistry::new()));
let inner_arc = Arc::new(inner);
let router_config = RouterConfig {
constrained_config: crate::constrained::ConstrainedTransportConfig::default(),
prefer_quic: true, enable_metrics: true,
max_connections: 256,
};
let mut router = ConnectionRouter::with_full_config(
router_config,
Arc::clone(&transport_registry),
Arc::clone(&inner_arc),
);
router.set_quic_endpoint(Arc::clone(&inner_arc));
let (data_tx, data_rx) = mpsc::channel(config.data_channel_capacity);
let (reader_exit_tx, reader_exit_rx) = mpsc::unbounded_channel();
let reader_handles = Arc::new(RwLock::new(HashMap::new()));
let peer_activity = Arc::new(RwLock::new(HashMap::new()));
let ack_waiters = Arc::new(ParkingRwLock::new(HashMap::new()));
let (peer_event_tx, _) = broadcast::channel(PEER_EVENT_CHANNEL_CAPACITY);
let peer_event_channels = Arc::new(ParkingRwLock::new(HashMap::new()));
let peer_event_generations = Arc::new(ParkingRwLock::new(HashMap::new()));
let endpoint = Self {
inner: inner_arc,
connected_peers: Arc::new(RwLock::new(HashMap::new())),
stats,
config,
event_tx,
peer_id,
public_key: public_key_bytes,
shutdown: CancellationToken::new(),
pending_data: Arc::new(RwLock::new(BoundedPendingBuffer::default())),
bootstrap_cache,
peer_hint_records: Arc::new(RwLock::new(HashMap::new())),
transport_registry,
router: Arc::new(RwLock::new(router)),
constrained_connections: Arc::new(RwLock::new(HashMap::new())),
constrained_peer_addrs: Arc::new(RwLock::new(HashMap::new())),
manual_known_peer_udp_addrs: Arc::new(RwLock::new(Vec::new())),
port_mapping_state: Arc::new(ParkingRwLock::new(PortMappingSnapshot::default())),
mdns_state: Arc::new(ParkingRwLock::new(MdnsSnapshot::default())),
mdns_auto_connect_inflight: Arc::new(ParkingRwLock::new(HashSet::new())),
direct_path_statuses,
data_tx,
data_rx: Arc::new(tokio::sync::Mutex::new(data_rx)),
reader_exit_tx,
reader_exit_rx: Arc::new(tokio::sync::Mutex::new(reader_exit_rx)),
reader_handles,
peer_activity,
ack_waiters,
peer_event_tx,
peer_event_channels,
peer_event_generations,
coordinator_health: Arc::new(crate::coordinator_health::CoordinatorHealth::new()),
};
endpoint.spawn_constrained_poller();
endpoint.spawn_peer_address_update_poller();
endpoint.spawn_stale_connection_reaper();
endpoint.spawn_reader_exit_handler();
endpoint.spawn_port_mapping_task();
endpoint.spawn_mdns_task();
endpoint.spawn_proactive_relay_manager();
Ok(endpoint)
}
pub fn peer_id(&self) -> PeerId {
self.peer_id
}
pub fn get_quic_connection(
&self,
peer_id: &PeerId,
) -> Result<Option<crate::high_level::Connection>, EndpointError> {
self.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)
}
pub fn local_addr(&self) -> Option<SocketAddr> {
self.inner
.get_endpoint()
.and_then(|ep| ep.local_addr().ok())
}
pub fn external_addr(&self) -> Option<SocketAddr> {
self.inner.get_observed_external_address().ok().flatten()
}
pub fn all_external_addrs(&self) -> Vec<SocketAddr> {
let mut addrs = self
.inner
.get_all_observed_external_addresses()
.unwrap_or_default();
if let Some(mapped_addr) = self.port_mapping_addr() {
if !addrs.contains(&mapped_addr) {
addrs.push(mapped_addr);
}
}
addrs
}
pub(crate) fn port_mapping_snapshot(&self) -> PortMappingSnapshot {
*self.port_mapping_state.read()
}
pub fn port_mapping_active(&self) -> bool {
self.port_mapping_snapshot().active
}
pub fn port_mapping_addr(&self) -> Option<SocketAddr> {
self.port_mapping_snapshot().external_addr
}
pub fn mdns_snapshot(&self) -> MdnsSnapshot {
self.mdns_state.read().clone()
}
pub fn direct_path_status(&self, peer_id: PeerId) -> Option<DirectPathStatus> {
self.direct_path_statuses.read().get(&peer_id).cloned()
}
pub fn relay_service_enabled(&self) -> bool {
true
}
pub const fn coordinator_service_enabled(&self) -> bool {
true
}
pub const fn bootstrap_service_enabled(&self) -> bool {
true
}
pub fn transport_registry(&self) -> &TransportRegistry {
&self.transport_registry
}
pub fn public_key_bytes(&self) -> &[u8] {
&self.public_key
}
pub async fn connect_addr(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
self.connect_orchestrated(None, vec![addr]).await
}
async fn prepare_direct_addr_attempt(
&self,
addr: SocketAddr,
) -> Result<Option<PeerConnection>, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
{
let peers = self.connected_peers.read().await;
for (_, existing) in peers.iter() {
if existing.remote_addr == TransportAddr::Udp(addr) {
if let Some(peer_id) = peers
.iter()
.find(|(_, p)| p.remote_addr == TransportAddr::Udp(addr))
.map(|(id, _)| *id)
{
if self.inner.is_peer_connected(&peer_id) {
info!(
"connect: reusing existing live connection to {} (peer {:?})",
addr, peer_id
);
return Ok(Some(existing.clone()));
}
}
break;
}
}
}
{
let mut peers = self.connected_peers.write().await;
let stale_peer_ids: Vec<PeerId> = peers
.iter()
.filter(|(_, p)| p.remote_addr == TransportAddr::Udp(addr))
.filter(|(id, _)| !self.inner.is_peer_connected(id))
.map(|(id, _)| *id)
.collect();
for stale_id in &stale_peer_ids {
peers.remove(stale_id);
info!(
"connect: removed stale connection entry for peer {:?} at {}",
stale_id, addr
);
}
}
Ok(None)
}
async fn attempt_direct_handshake(
&self,
addr: SocketAddr,
) -> Result<crate::high_level::Connection, EndpointError> {
info!("Connecting directly to {}", addr);
let endpoint = self
.inner
.get_endpoint()
.ok_or_else(|| EndpointError::Config("QUIC endpoint not available".to_string()))?;
let connecting = endpoint
.connect(addr, "peer")
.map_err(|e| EndpointError::Connection(e.to_string()))?;
let handshake_timeout = self
.config
.timeouts
.nat_traversal
.connection_establishment_timeout;
match timeout(handshake_timeout, connecting).await {
Ok(Ok(conn)) => Ok(conn),
Ok(Err(e)) => {
info!("connect: handshake to {} failed: {}", addr, e);
Err(EndpointError::Connection(e.to_string()))
}
Err(_) => {
info!(
"connect: handshake to {} timed out after {:?}",
addr, handshake_timeout
);
Err(EndpointError::Timeout)
}
}
}
async fn connect_direct_addr(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
self.connect_direct_addr_with_hint(addr, None).await
}
async fn connect_direct_addr_with_hint(
&self,
addr: SocketAddr,
hint_peer_id: Option<PeerId>,
) -> Result<PeerConnection, EndpointError> {
if let Some(existing) = self.prepare_direct_addr_attempt(addr).await? {
return Ok(existing);
}
let connection = self.attempt_direct_handshake(addr).await?;
self.finalize_direct_connection(connection, addr, hint_peer_id)
.await
}
fn runtime_known_peer_udp_addrs(&self) -> Vec<SocketAddr> {
let mut addrs: Vec<SocketAddr> = self
.config
.known_peers
.iter()
.filter_map(|addr| addr.as_socket_addr())
.collect();
for addr in self.inner.bootstrap_addresses() {
if !addrs.contains(&addr) {
addrs.push(addr);
}
}
addrs
}
async fn peer_directory_snapshot(&self) -> PeerDirectorySnapshot {
let mut snapshot = PeerDirectorySnapshot::default();
for addr in self
.config
.known_peers
.iter()
.filter_map(TransportAddr::as_socket_addr)
{
snapshot.add_locator_claim(
None,
vec![addr],
PeerDiscoverySource::StaticKnownPeer,
None,
);
}
for addr in self
.manual_known_peer_udp_addrs
.read()
.await
.iter()
.copied()
{
snapshot.add_locator_claim(
None,
vec![addr],
PeerDiscoverySource::ManualKnownPeer,
None,
);
}
for addr in self.runtime_known_peer_udp_addrs() {
snapshot.add_locator_claim(
None,
vec![addr],
PeerDiscoverySource::RuntimeKnownPeer,
None,
);
}
{
let hints = self.peer_hint_records.read().await;
for (peer_id, record) in hints.iter() {
for addr in &record.addrs {
snapshot.add_authenticated_addr(
*peer_id,
*addr,
PeerDiscoverySource::PeerHints,
);
}
snapshot.add_authenticated_capabilities(
*peer_id,
&record.capabilities,
PeerDiscoverySource::PeerHints,
);
}
}
for peer in self.bootstrap_cache.all_peers().await {
snapshot.add_cached_peer(&peer);
}
for peer in self.mdns_snapshot().discovered_peers {
snapshot.add_locator_claim(
peer.claimed_peer_id,
peer.addresses.clone(),
PeerDiscoverySource::Mdns,
Some(peer),
);
}
snapshot
}
fn discovered_peer_allowed(&self, claimed_peer_id: Option<PeerId>) -> Result<(), String> {
match &self.config.trust {
TrustPolicy::AuthenticateOnly => Ok(()),
TrustPolicy::AllowedPeerIds(peer_ids) => {
let claimed_peer_id =
claimed_peer_id.ok_or_else(|| "missing claimed peer identity".to_string())?;
if peer_ids.contains(&claimed_peer_id) {
Ok(())
} else {
Err(format!(
"peer {} is not in the discovery allowlist",
hex::encode(claimed_peer_id.0)
))
}
}
}
}
async fn connect_direct_candidates(
&self,
addrs: &[SocketAddr],
hint_peer_id: Option<PeerId>,
) -> Result<PeerConnection, EndpointError> {
let mut last_err: Option<EndpointError> = None;
for addr in addrs {
match self
.connect_direct_addr_with_hint(*addr, hint_peer_id)
.await
{
Ok(conn) => return Ok(conn),
Err(err) => last_err = Some(err),
}
}
Err(last_err.unwrap_or(EndpointError::NoAddress))
}
async fn refresh_runtime_known_peer_connections(&self) {
for addr in self.runtime_known_peer_udp_addrs() {
if let Err(e) = self.connect_direct_addr(addr).await {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "connect_direct_addr_drop",
addr = ?addr,
error = ?e,
"silent drop"
);
}
}
}
#[cfg(test)]
async fn hinted_addrs_for_peer(&self, peer_id: PeerId) -> Vec<SocketAddr> {
self.peer_hint_records
.read()
.await
.get(&peer_id)
.map(|record| record.addrs.clone())
.unwrap_or_default()
}
async fn hinted_assist_addrs(&self, relay: bool, coordination: bool) -> Vec<SocketAddr> {
let hints = self.peer_hint_records.read().await;
let mut candidates = Vec::new();
for record in hints.values() {
let matches = (relay && record.capabilities.supports_relay)
|| (coordination && record.capabilities.supports_coordination);
if !matches {
continue;
}
for addr in &record.addrs {
if !candidates.contains(addr) {
candidates.push(*addr);
}
}
}
candidates
}
async fn coordinator_candidates(&self) -> Vec<SocketAddr> {
let mut candidates = Vec::new();
if let Some(addr) = self.inner.preferred_coordinator()
&& !candidates.contains(&addr)
{
candidates.push(addr);
}
for addr in self.hinted_assist_addrs(false, true).await {
if !candidates.contains(&addr) {
candidates.push(addr);
}
}
for peer in self.bootstrap_cache.select_coordinators(6).await {
for addr in peer.preferred_addresses() {
if !candidates.contains(&addr) {
candidates.push(addr);
}
}
}
{
let peers = self.connected_peers.read().await;
for (peer_id, existing) in peers.iter() {
let Some(addr) = existing.remote_addr.as_socket_addr() else {
continue;
};
if !self.inner.is_peer_connected(peer_id) {
continue;
}
if !candidates.contains(&addr) {
candidates.push(addr);
}
}
}
for addr in self.runtime_known_peer_udp_addrs() {
if !candidates.contains(&addr) {
candidates.push(addr);
}
}
self.coordinator_health.filter_available(&candidates)
}
pub(crate) async fn runtime_assist_snapshot(&self) -> RuntimeAssistSnapshot {
let successful_coordinations = self
.inner
.get_statistics()
.map(|stats| stats.successful_coordinations)
.unwrap_or(0);
let (active_relay_sessions, relay_bytes_forwarded) =
self.inner.relay_server_runtime_metrics();
RuntimeAssistSnapshot {
successful_coordinations,
active_relay_sessions,
relay_bytes_forwarded,
}
}
async fn find_live_connection_for_addrs(&self, addrs: &[SocketAddr]) -> Option<PeerConnection> {
let peers = self.connected_peers.read().await;
for addr in addrs {
if let Some((existing_peer_id, existing)) = peers
.iter()
.find(|(_, p)| p.remote_addr == TransportAddr::Udp(*addr))
.map(|(id, conn)| (*id, conn.clone()))
{
if self.inner.is_peer_connected(&existing_peer_id) {
return Some(existing);
}
}
}
None
}
async fn connect_orchestrated(
&self,
peer_id: Option<PeerId>,
mut explicit_addrs: Vec<SocketAddr>,
) -> Result<PeerConnection, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let is_simple_address_only = peer_id.is_none() && explicit_addrs.len() == 1;
if let Some(peer_id) = peer_id {
if let Some(conn) = self.connected_peers.read().await.get(&peer_id) {
if self.inner.is_peer_connected(&peer_id) {
return Ok(conn.clone());
}
}
}
if !is_simple_address_only {
let peers = self.connected_peers.read().await;
for addr in &explicit_addrs {
if let Some((existing_peer_id, existing)) = peers
.iter()
.find(|(_, p)| p.remote_addr == TransportAddr::Udp(*addr))
.map(|(id, conn)| (*id, conn.clone()))
{
if self.inner.is_peer_connected(&existing_peer_id) {
info!(
"connect_orchestrated: reusing existing live connection to {} (peer {:?})",
addr, existing_peer_id
);
return Ok(existing);
}
}
}
}
if !is_simple_address_only {
let target_addrs = explicit_addrs.clone();
let mut peers = self.connected_peers.write().await;
let stale_peer_ids: Vec<PeerId> = peers
.iter()
.filter(|(_, p)| match p.remote_addr {
TransportAddr::Udp(addr) => target_addrs.contains(&addr),
_ => false,
})
.filter(|(id, _)| !self.inner.is_peer_connected(id))
.map(|(id, _)| *id)
.collect();
for stale_id in &stale_peer_ids {
peers.remove(stale_id);
}
}
if let Some(peer_id) = peer_id {
let directory = self.peer_directory_snapshot().await;
for addr in directory.candidate_addrs_for_peer(peer_id) {
if !explicit_addrs.contains(&addr) {
explicit_addrs.push(addr);
}
}
}
if let Some(peer_id) = peer_id
&& let Some(runtime_addr) = self.inner.bootstrap_address_for_peer(peer_id)
&& !explicit_addrs.contains(&runtime_addr)
{
explicit_addrs.push(runtime_addr);
}
prioritize_direct_candidate_addrs(&mut explicit_addrs);
drop_non_global_direct_candidates_when_global_present(&mut explicit_addrs);
if !explicit_addrs.is_empty() && !is_simple_address_only {
match self
.connect_direct_candidates(&explicit_addrs, peer_id)
.await
{
Ok(conn) => return Ok(conn),
Err(err) => {
debug!(
"connect_orchestrated: direct multi-candidate pre-pass exhausted before fallback: {}",
err
);
}
}
}
let target_ipv4 = explicit_addrs.iter().copied().find(SocketAddr::is_ipv4);
let target_ipv6 = explicit_addrs.iter().copied().find(SocketAddr::is_ipv6);
if target_ipv4.is_some() || target_ipv6.is_some() {
match self
.connect_with_fallback(target_ipv4, target_ipv6, None, peer_id)
.await
{
Ok((conn, _)) => return Ok(conn),
Err(err) => {
let peers = self.connected_peers.read().await;
for addr in &explicit_addrs {
if let Some((existing_peer_id, existing)) = peers
.iter()
.find(|(_, p)| p.remote_addr == TransportAddr::Udp(*addr))
.map(|(id, conn)| (*id, conn.clone()))
{
if self.inner.is_peer_connected(&existing_peer_id) {
info!(
"connect_orchestrated: converged to existing live connection after fallback failure for {} (peer {:?})",
addr, existing_peer_id
);
return Ok(existing);
}
}
}
return Err(err);
}
}
}
if let Some(peer_id) = peer_id {
if explicit_addrs.is_empty() && self.inner.preferred_coordinator().is_none() {
self.refresh_runtime_known_peer_connections().await;
if let Some(conn) = self.connected_peers.read().await.get(&peer_id)
&& self.inner.is_peer_connected(&peer_id)
{
return Ok(conn.clone());
}
}
#[allow(deprecated)]
{
return self.connect_to_peer(peer_id, None).await;
}
}
Err(EndpointError::NoAddress)
}
#[deprecated(
note = "use connect_addr(addr) to route address-based dials through the unified orchestrator"
)]
pub async fn connect(&self, addr: SocketAddr) -> Result<PeerConnection, EndpointError> {
self.connect_addr(addr).await
}
pub async fn connect_transport(
&self,
addr: &TransportAddr,
peer_id: Option<PeerId>,
) -> Result<PeerConnection, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let mut router = self.router.write().await;
let engine = router.select_engine_for_addr(addr);
info!(
"Connecting to {} via {:?} engine (peer_id: {:?})",
addr, engine, peer_id
);
match engine {
ProtocolEngine::Quic => {
let socket_addr = addr.as_socket_addr().ok_or_else(|| {
EndpointError::Connection(format!(
"Cannot extract socket address from {} for QUIC",
addr
))
})?;
drop(router); self.connect_addr(socket_addr).await
}
ProtocolEngine::Constrained => {
let _routed = router.connect(addr).map_err(|e| {
EndpointError::Connection(format!("Constrained connection failed: {}", e))
})?;
let actual_peer_id = peer_id.unwrap_or_else(|| peer_id_from_transport_addr(addr));
let peer_conn = PeerConnection {
peer_id: actual_peer_id,
remote_addr: addr.clone(),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: false, connected_at: Instant::now(),
last_activity: Instant::now(),
};
drop(router); self.register_connected_peer(peer_conn.clone()).await;
self.observe_peer_reachability(&peer_conn);
Ok(peer_conn)
}
}
}
pub async fn router(&self) -> tokio::sync::RwLockReadGuard<'_, ConnectionRouter> {
self.router.read().await
}
pub async fn routing_stats(&self) -> crate::connection_router::RouterStats {
self.router.read().await.stats().clone()
}
pub async fn register_constrained_connection(
&self,
peer_id: PeerId,
conn_id: ConstrainedConnectionId,
) -> Option<ConstrainedConnectionId> {
let old = self
.constrained_connections
.write()
.await
.insert(peer_id, conn_id);
debug!(
"Registered constrained connection for peer {:?}: conn_id={:?}",
peer_id, conn_id
);
old
}
pub async fn unregister_constrained_connection(
&self,
peer_id: &PeerId,
) -> Option<ConstrainedConnectionId> {
let removed = self.constrained_connections.write().await.remove(peer_id);
if removed.is_some() {
debug!("Unregistered constrained connection for peer {:?}", peer_id);
}
removed
}
pub async fn has_constrained_connection(&self, peer_id: &PeerId) -> bool {
self.constrained_connections
.read()
.await
.contains_key(peer_id)
}
pub async fn get_constrained_connection_id(
&self,
peer_id: &PeerId,
) -> Option<ConstrainedConnectionId> {
self.constrained_connections
.read()
.await
.get(peer_id)
.copied()
}
pub async fn constrained_connection_count(&self) -> usize {
self.constrained_connections.read().await.len()
}
pub async fn peer_id_from_constrained_conn(
&self,
conn_id: ConstrainedConnectionId,
) -> Option<PeerId> {
self.constrained_peer_addrs
.read()
.await
.get(&conn_id)
.map(|(peer_id, _)| *peer_id)
}
pub async fn connect_dual_stack(
&self,
addresses: &[SocketAddr],
peer_id: Option<PeerId>,
) -> Result<PeerConnection, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let ipv4_addrs: Vec<SocketAddr> = addresses
.iter()
.filter(|addr| matches!(addr.ip(), IpAddr::V4(_)))
.copied()
.collect();
let ipv6_addrs: Vec<SocketAddr> = addresses
.iter()
.filter(|addr| matches!(addr.ip(), IpAddr::V6(_)))
.copied()
.collect();
let mut direct_candidate_addrs = Vec::new();
extend_unique_socket_addrs(&mut direct_candidate_addrs, ipv4_addrs.iter().copied());
extend_unique_socket_addrs(&mut direct_candidate_addrs, ipv6_addrs.iter().copied());
let (direct_strategy, rtt_hint) = self
.adaptive_strategy_config_for_candidates(peer_id, &direct_candidate_addrs, None, &[])
.await;
info!(
"Dual-stack connect: {} IPv4, {} IPv6 addresses (PeerId: {:?}, direct budgets: v4={:?}, v6={:?}, rtt_hint={:?})",
ipv4_addrs.len(),
ipv6_addrs.len(),
peer_id,
direct_strategy.ipv4_timeout,
direct_strategy.ipv6_timeout,
rtt_hint,
);
let (ipv4_result, ipv6_result) = tokio::join!(
self.try_connect_family(&ipv4_addrs, "IPv4", direct_strategy.ipv4_timeout, peer_id,),
self.try_connect_family(&ipv6_addrs, "IPv6", direct_strategy.ipv6_timeout, peer_id,),
);
match (ipv4_result, ipv6_result) {
(Some(v4_conn), Some(v6_conn)) => {
info!(
"✓✓ Dual-stack success! IPv4: {}, IPv6: {} (maintaining both connections)",
v4_conn.remote_addr, v6_conn.remote_addr
);
Ok(v6_conn)
}
(Some(v4_conn), None) => {
info!(
"IPv4-only connection established to {}",
v4_conn.remote_addr
);
Ok(v4_conn)
}
(None, Some(v6_conn)) => {
info!(
"IPv6-only connection established to {}",
v6_conn.remote_addr
);
Ok(v6_conn)
}
(None, None) => {
warn!("Both IPv4 and IPv6 direct connections failed");
Err(EndpointError::Connection(
"Dual-stack connection failed for both address families".to_string(),
))
}
}
}
async fn try_connect_family(
&self,
addresses: &[SocketAddr],
family_name: &str,
stage_budget: Duration,
hint_peer_id: Option<PeerId>,
) -> Option<PeerConnection> {
try_addrs_with_shared_stage_budget(
addresses,
family_name,
stage_budget,
|addr| async move { self.connect_direct_addr_with_hint(addr, hint_peer_id).await },
)
.await
}
pub async fn connect_cached(&self, peer_id: PeerId) -> Result<PeerConnection, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
if let Some(conn) = self.connected_peers.read().await.get(&peer_id) {
return Ok(conn.clone());
}
let cached_peer = self
.bootstrap_cache
.get_peer(&peer_id)
.await
.ok_or(EndpointError::PeerNotFound(peer_id))?;
let preferred_addrs = cached_peer.preferred_addresses();
debug!(
"Connecting to cached peer {:?} ({} preferred addresses)",
peer_id,
preferred_addrs.len()
);
self.connect_dual_stack(&preferred_addrs, Some(peer_id))
.await
}
pub async fn connect_peer(&self, peer_id: PeerId) -> Result<PeerConnection, EndpointError> {
self.connect_orchestrated(Some(peer_id), Vec::new()).await
}
pub async fn connect_peer_with_addrs(
&self,
peer_id: PeerId,
addrs: Vec<SocketAddr>,
) -> Result<PeerConnection, EndpointError> {
self.connect_orchestrated(Some(peer_id), addrs).await
}
pub async fn upsert_peer_hints(
&self,
peer_id: PeerId,
addrs: Vec<SocketAddr>,
capabilities: Option<PeerCapabilities>,
) {
{
let mut hints = self.peer_hint_records.write().await;
hints
.entry(peer_id)
.or_default()
.merge(addrs.clone(), capabilities.clone());
}
if addrs.is_empty() && capabilities.is_none() {
return;
}
let mut cached_peer = self
.bootstrap_cache
.get_peer(&peer_id)
.await
.unwrap_or_else(|| CachedPeer::new(peer_id, Vec::new(), PeerSource::Merge));
for addr in addrs {
if !cached_peer.addresses.contains(&addr) {
cached_peer.addresses.push(addr);
}
}
if let Some(caps) = capabilities {
cached_peer
.capabilities
.record_assist_hints(caps.supports_relay, caps.supports_coordination);
cached_peer.capabilities.protocols.extend(caps.protocols);
if caps.nat_type.is_some() {
cached_peer.capabilities.nat_type = caps.nat_type;
}
for addr in caps.external_addresses {
cached_peer.capabilities.record_external_address(addr);
}
}
self.bootstrap_cache.upsert(cached_peer).await;
}
#[deprecated(
note = "use connect_peer(peer_id) to route peer-oriented dials through the unified orchestrator"
)]
pub async fn connect_to_peer(
&self,
peer_id: PeerId,
coordinator: Option<SocketAddr>,
) -> Result<PeerConnection, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let coord_addr = if let Some(addr) = coordinator {
addr
} else {
self.coordinator_candidates()
.await
.into_iter()
.next()
.ok_or_else(|| EndpointError::Config("No coordinator available".to_string()))?
};
info!(
"Initiating NAT traversal to peer {:?} via coordinator {}",
peer_id, coord_addr
);
if let Err(e) = self.event_tx.send(P2pEvent::NatTraversalProgress {
peer_id,
phase: TraversalPhase::Discovery,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_nat_progress_discovery",
peer_id = ?peer_id,
error = %e,
"silent drop"
);
}
if let Err(e) = self.inner.initiate_nat_traversal(peer_id, coord_addr) {
self.coordinator_health.record_failure(coord_addr);
return Err(EndpointError::NatTraversal(e));
}
let deadline = tokio::time::Instant::now()
+ self
.config
.timeouts
.nat_traversal
.connection_establishment_timeout;
loop {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
if let Some(conn) = self
.inner
.get_connection_by_authenticated_peer(peer_id)
.await
.or_else(|| self.inner.session_connection(peer_id))
{
info!(
"connect_to_peer observed existing inner connection for peer {:?}; finalizing",
peer_id
);
let remote_address = conn.remote_address();
let side = conn.side();
self.inner
.register_connection_peer_id(remote_address, peer_id);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(remote_address),
traversal_method: TraversalMethod::HolePunch,
side,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let endpoint = self.clone();
tokio::spawn(async move {
endpoint.spawn_reader_task(peer_id, conn).await;
});
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
self.coordinator_health.record_success(&coord_addr);
publish_direct_path_status(
self.direct_path_statuses.as_ref(),
&self.event_tx,
peer_id,
DirectPathStatus::Established {
remote_addr: remote_address,
},
);
return Ok(peer_conn);
}
let events = self
.inner
.poll(Instant::now())
.map_err(EndpointError::NatTraversal)?;
let had_events = !events.is_empty();
for event in events {
info!(
"connect_to_peer polled event for target {:?}: {:?}",
peer_id, event
);
match event {
NatTraversalEvent::ConnectionEstablished {
peer_id: evt_peer,
remote_address,
side,
..
} if evt_peer == peer_id => {
self.inner
.register_connection_peer_id(remote_address, peer_id);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(remote_address),
traversal_method: TraversalMethod::HolePunch,
side,
authenticated: true, connected_at: Instant::now(),
last_activity: Instant::now(),
};
if let Some(conn) = self
.inner
.get_connection_by_authenticated_peer(peer_id)
.await
.or_else(|| self.inner.session_connection(peer_id))
{
let endpoint = self.clone();
tokio::spawn(async move {
endpoint.spawn_reader_task(peer_id, conn).await;
});
}
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
self.coordinator_health.record_success(&coord_addr);
publish_direct_path_status(
self.direct_path_statuses.as_ref(),
&self.event_tx,
peer_id,
DirectPathStatus::Established {
remote_addr: remote_address,
},
);
return Ok(peer_conn);
}
NatTraversalEvent::TraversalFailed {
peer_id: evt_peer,
error,
..
} if evt_peer == peer_id => {
self.coordinator_health.record_failure(coord_addr);
return Err(EndpointError::NatTraversal(error));
}
_ => {}
}
}
if let Some(conn) = self
.inner
.get_connection_by_authenticated_peer(peer_id)
.await
.or_else(|| self.inner.session_connection(peer_id))
{
info!(
"connect_to_peer observed existing inner connection for peer {:?}; finalizing",
peer_id
);
let remote_address = conn.remote_address();
let side = conn.side();
self.inner
.register_connection_peer_id(remote_address, peer_id);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(remote_address),
traversal_method: TraversalMethod::HolePunch,
side,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let endpoint = self.clone();
tokio::spawn(async move {
endpoint.spawn_reader_task(peer_id, conn).await;
});
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
self.coordinator_health.record_success(&coord_addr);
publish_direct_path_status(
self.direct_path_statuses.as_ref(),
&self.event_tx,
peer_id,
DirectPathStatus::Established {
remote_addr: remote_address,
},
);
return Ok(peer_conn);
}
if had_events {
continue;
}
match self.wait_for_traversal_progress(peer_id, deadline).await {
Ok(()) => {}
Err(EndpointError::ShuttingDown) => {
self.coordinator_health.record_failure(coord_addr);
return Err(EndpointError::ShuttingDown);
}
Err(EndpointError::Timeout) => {
self.coordinator_health.record_failure(coord_addr);
return Err(EndpointError::Timeout);
}
Err(error) => {
self.coordinator_health.record_failure(coord_addr);
return Err(error);
}
}
}
}
async fn connection_strategy_rtt_hint_for_candidates(
&self,
peer_id: Option<PeerId>,
direct_candidate_addrs: &[SocketAddr],
coordinator: Option<SocketAddr>,
relay_addrs: &[SocketAddr],
) -> Option<Duration> {
let mut hints = Vec::new();
if let Some(peer_id) = peer_id
&& let Some(cached_peer) = self.bootstrap_cache.get_peer(&peer_id).await
&& let Some(rtt) = cached_peer_avg_rtt(&cached_peer)
{
hints.push(rtt);
}
let mut candidate_addrs = Vec::new();
extend_unique_socket_addrs(&mut candidate_addrs, direct_candidate_addrs.iter().copied());
extend_unique_socket_addrs(&mut candidate_addrs, coordinator);
extend_unique_socket_addrs(&mut candidate_addrs, relay_addrs.iter().copied());
if !candidate_addrs.is_empty() {
let peers = self.bootstrap_cache.all_peers().await;
if let Some(rtt) = strategy_rtt_hint_from_cached_peers(&peers, &candidate_addrs) {
hints.push(rtt);
}
}
hints.into_iter().max()
}
async fn adaptive_strategy_config_for_candidates(
&self,
peer_id: Option<PeerId>,
direct_candidate_addrs: &[SocketAddr],
coordinator: Option<SocketAddr>,
relay_addrs: &[SocketAddr],
) -> (StrategyConfig, Option<Duration>) {
let rtt_hint = self
.connection_strategy_rtt_hint_for_candidates(
peer_id,
direct_candidate_addrs,
coordinator,
relay_addrs,
)
.await;
let mut config = StrategyConfig::default();
config.apply_adaptive_timeouts(
self.config
.timeouts
.nat_traversal
.connection_establishment_timeout,
self.config.timeouts.nat_traversal.coordination_timeout,
rtt_hint,
);
(config, rtt_hint)
}
pub async fn connect_with_fallback(
&self,
target_ipv4: Option<SocketAddr>,
target_ipv6: Option<SocketAddr>,
strategy_config: Option<StrategyConfig>,
peer_id: Option<PeerId>,
) -> Result<(PeerConnection, ConnectionMethod), EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let custom_strategy_supplied = strategy_config.is_some();
let mut config = strategy_config.unwrap_or_default();
if config.coordinator.is_none() {
config.coordinator = self.coordinator_candidates().await.into_iter().next();
}
if config.relay_addrs.is_empty() {
let target_addr = target_ipv4.or(target_ipv6);
if let Some(addr) = target_addr {
let relays = self
.bootstrap_cache
.select_relays_for_target(1, &addr, true)
.await;
if let Some(best_relay) = relays.first() {
if let Some(relay_addr) = best_relay.preferred_addresses().first().copied() {
config.relay_addrs.push(relay_addr);
debug!(
"Selected optimized relay from cache: {:?} for target {}",
relay_addr, addr
);
}
}
}
if config.relay_addrs.is_empty() {
let target_addr = target_ipv4.or(target_ipv6);
for relay_addr in self.hinted_assist_addrs(true, false).await {
if Some(relay_addr) == target_addr {
continue;
}
if let Some(target) = target_addr
&& relay_addr.is_ipv4() != target.is_ipv4()
{
continue;
}
if !config.relay_addrs.contains(&relay_addr) {
config.relay_addrs.push(relay_addr);
}
}
}
if config.relay_addrs.is_empty() {
let target_addr = target_ipv4.or(target_ipv6);
let peers = self.connected_peers.read().await;
for (existing_peer_id, existing) in peers.iter() {
let Some(relay_addr) = existing.remote_addr.as_socket_addr() else {
continue;
};
if Some(relay_addr) == target_addr {
continue;
}
if !self.inner.is_peer_connected(existing_peer_id) {
continue;
}
if !config.relay_addrs.contains(&relay_addr) {
config.relay_addrs.push(relay_addr);
}
}
}
if config.relay_addrs.is_empty() {
let target_addr = target_ipv4.or(target_ipv6);
for relay_addr in self.runtime_known_peer_udp_addrs() {
if Some(relay_addr) == target_addr {
continue;
}
if !config.relay_addrs.contains(&relay_addr) {
config.relay_addrs.push(relay_addr);
}
}
}
if config.relay_addrs.is_empty() {
if let Some(relay_addr) = self.config.nat.relay_nodes.first().copied() {
config.relay_addrs.push(relay_addr);
}
}
}
if !custom_strategy_supplied {
let mut direct_candidate_addrs = Vec::new();
extend_unique_socket_addrs(
&mut direct_candidate_addrs,
[target_ipv4, target_ipv6].into_iter().flatten(),
);
let (adaptive_config, rtt_hint) = self
.adaptive_strategy_config_for_candidates(
peer_id,
&direct_candidate_addrs,
config.coordinator,
&config.relay_addrs,
)
.await;
config.ipv4_timeout = adaptive_config.ipv4_timeout;
config.ipv6_timeout = adaptive_config.ipv6_timeout;
config.holepunch_timeout = adaptive_config.holepunch_timeout;
config.relay_timeout = adaptive_config.relay_timeout;
debug!(
"Adaptive connection strategy budgets: direct={:?}, holepunch={:?}, relay={:?}, rtt_hint={:?}",
config.ipv4_timeout, config.holepunch_timeout, config.relay_timeout, rtt_hint
);
}
let mut strategy = ConnectionStrategy::new(config);
let overall_deadline = tokio::time::Instant::now()
+ self
.config
.timeouts
.nat_traversal
.connection_establishment_timeout;
info!(
"Starting fallback connection: IPv4={:?}, IPv6={:?} (PeerId: {:?})",
target_ipv4, target_ipv6, peer_id
);
let mut direct_addresses: Vec<SocketAddr> = Vec::new();
if let Some(v6) = target_ipv6 {
direct_addresses.push(v6);
}
if let Some(v4) = target_ipv4 {
direct_addresses.push(v4);
}
loop {
match strategy.current_stage().clone() {
ConnectionStage::DirectIPv4 { .. } => {
if direct_addresses.is_empty() {
debug!("No direct addresses provided, skipping to hole-punch");
strategy.transition_to_ipv6("No direct addresses");
continue;
}
for addr in &direct_addresses {
if let Some(existing) = self.prepare_direct_addr_attempt(*addr).await? {
let method = if addr.is_ipv6() {
ConnectionMethod::DirectIPv6
} else {
ConnectionMethod::DirectIPv4
};
info!(
"Direct stage: reusing existing exact-address connection to {}",
addr
);
return Ok((existing, method));
}
}
let he_config = HappyEyeballsConfig::default();
let direct_timeout = strategy.ipv4_timeout().max(strategy.ipv6_timeout());
let handshake_timeout = self
.config
.timeouts
.nat_traversal
.connection_establishment_timeout;
info!(
"Happy Eyeballs: racing {} direct addresses (timeout: {:?})",
direct_addresses.len(),
direct_timeout
);
let quic_endpoint = match self.inner.get_endpoint().cloned() {
Some(ep) => ep,
None => {
debug!("QUIC endpoint not available, skipping direct");
strategy.transition_to_ipv6("QUIC endpoint not available");
strategy.transition_to_holepunch("QUIC endpoint not available");
continue;
}
};
let addrs = direct_addresses.clone();
let he_result = timeout(direct_timeout, async {
happy_eyeballs::race_connect(&addrs, &he_config, |addr| {
let ep = quic_endpoint.clone();
async move {
let connecting = ep
.connect(addr, "peer")
.map_err(|e| format!("connect error: {e}"))?;
match timeout(handshake_timeout, connecting).await {
Ok(Ok(conn)) => Ok(conn),
Ok(Err(e)) => Err(format!("handshake error: {e}")),
Err(_) => Err(format!(
"handshake timeout after {:?}",
handshake_timeout
)),
}
}
})
.await
})
.await;
match he_result {
Ok(Ok((connection, winning_addr))) => {
let method = if winning_addr.is_ipv6() {
ConnectionMethod::DirectIPv6
} else {
ConnectionMethod::DirectIPv4
};
info!(
"Happy Eyeballs: {} connection to {} succeeded",
method, winning_addr
);
let peer_conn = self
.finalize_direct_connection(connection, winning_addr, peer_id)
.await?;
return Ok((peer_conn, method));
}
Ok(Err(e)) => {
if let Some(existing) =
self.find_live_connection_for_addrs(&direct_addresses).await
{
let method = existing
.remote_addr
.as_socket_addr()
.map(|addr| {
if addr.is_ipv6() {
ConnectionMethod::DirectIPv6
} else {
ConnectionMethod::DirectIPv4
}
})
.unwrap_or(ConnectionMethod::DirectIPv4);
debug!(
"Happy Eyeballs: direct race exhausted but converged to existing live connection"
);
return Ok((existing, method));
}
debug!("Happy Eyeballs: all direct attempts failed: {}", e);
strategy.transition_to_ipv6(e.to_string());
strategy.transition_to_holepunch("Happy Eyeballs exhausted");
}
Err(_) => {
if let Some(existing) =
self.find_live_connection_for_addrs(&direct_addresses).await
{
let method = existing
.remote_addr
.as_socket_addr()
.map(|addr| {
if addr.is_ipv6() {
ConnectionMethod::DirectIPv6
} else {
ConnectionMethod::DirectIPv4
}
})
.unwrap_or(ConnectionMethod::DirectIPv4);
debug!(
"Happy Eyeballs: direct race timed out but converged to existing live connection"
);
return Ok((existing, method));
}
debug!("Happy Eyeballs: direct connection timed out");
strategy.transition_to_ipv6("Timeout");
strategy.transition_to_holepunch("Happy Eyeballs timed out");
}
}
}
ConnectionStage::DirectIPv6 { .. } => {
debug!(
"DirectIPv6 stage reached after Happy Eyeballs, advancing to hole-punch"
);
strategy.transition_to_holepunch("Handled by Happy Eyeballs");
}
ConnectionStage::HolePunching {
coordinator, round, ..
} => {
let target = target_ipv4
.or(target_ipv6)
.ok_or(EndpointError::NoAddress)?;
info!(
"Trying hole-punch to {} via {} (round {})",
target, coordinator, round
);
let target_peer_id =
peer_id.unwrap_or_else(|| peer_id_from_socket_addr(target));
match self
.start_hole_punch_session(coordinator, target_peer_id)
.await
{
Ok(()) => match self
.await_hole_punch_outcome(target, target_peer_id, overall_deadline)
.await
{
Ok(conn) => {
info!("✓ Hole-punch succeeded to {} via {}", target, coordinator);
return Ok((conn, ConnectionMethod::HolePunched { coordinator }));
}
Err(error) => {
let retryable = strategy.should_retry_holepunch()
&& error
.retry_reason()
.is_some_and(Self::should_retry_hole_punch_reason);
let error_text = error.to_string();
strategy.record_holepunch_error(round, error_text.clone());
if retryable {
debug!(
"Hole-punch round {} failed with retryable reason, retrying",
round
);
strategy.increment_round();
} else {
debug!("Hole-punch failed after {} rounds", round);
strategy.transition_to_relay(error_text);
}
}
},
Err(e) => {
let retryable = strategy.should_retry_holepunch()
&& match &e {
EndpointError::NatTraversal(error) => {
TraversalFailureReason::from_public_operation_error(error)
.as_ref()
.is_some_and(Self::should_retry_hole_punch_reason)
}
_ => false,
};
let error_text = e.to_string();
strategy.record_holepunch_error(round, error_text.clone());
if retryable {
debug!("Hole-punch round {} failed to start, retrying", round);
strategy.increment_round();
} else {
debug!("Hole-punch failed to start after {} rounds", round);
strategy.transition_to_relay(error_text);
}
}
}
}
ConnectionStage::Relay { relay_addr, .. } => {
let fallback_target = target_ipv4.or(target_ipv6);
let target = self
.select_relay_target_addr(peer_id, target_ipv4, target_ipv6)
.await
.ok_or(EndpointError::NoAddress)?;
if Some(target) != fallback_target {
debug!(
"Relay target selection preferred durable address {} over fallback {:?}",
target, fallback_target
);
}
info!("Trying relay connection to {} via {}", target, relay_addr);
match timeout(
strategy.relay_timeout(),
self.try_relay_connection(target, relay_addr, peer_id),
)
.await
{
Ok(Ok(conn)) => {
info!(
"✓ Relay connection succeeded to {} via {}",
target, relay_addr
);
publish_direct_path_status(
self.direct_path_statuses.as_ref(),
&self.event_tx,
conn.peer_id,
DirectPathStatus::BestEffortUnavailable {
reason: DirectPathUnavailableReason::RelayRequired,
},
);
return Ok((conn, ConnectionMethod::Relayed { relay: relay_addr }));
}
Ok(Err(e)) => {
debug!("Relay connection failed: {}", e);
strategy.transition_to_next_relay(e.to_string());
}
Err(_) => {
debug!("Relay connection timed out");
strategy.transition_to_next_relay("Timeout");
}
}
}
ConnectionStage::Failed { errors } => {
let error_summary = errors
.iter()
.map(|e| format!("{:?}: {}", e.method, e.error))
.collect::<Vec<_>>()
.join("; ");
return Err(EndpointError::AllStrategiesFailed(error_summary));
}
ConnectionStage::Connected { via } => {
return Err(EndpointError::Connection(format!(
"Connection strategy reached terminal connected state without returning: {:?}",
via
)));
}
}
}
}
async fn finalize_direct_connection(
&self,
connection: crate::high_level::Connection,
addr: SocketAddr,
hint_peer_id: Option<PeerId>,
) -> Result<PeerConnection, EndpointError> {
let peer_id = self
.inner
.extract_peer_id_from_connection(&connection)
.await
.or(hint_peer_id)
.unwrap_or_else(|| peer_id_from_socket_addr(addr));
let registration = self
.inner
.add_connection_with_outcome(peer_id, connection.clone())
.map_err(EndpointError::NatTraversal)?;
if matches!(
registration,
crate::nat_traversal_api::ConnectionRegistrationOutcome::Rejected { .. }
) {
if let Some(existing) = self.connected_peers.read().await.get(&peer_id).cloned() {
return Ok(existing);
}
let live_connection = self
.inner
.get_connection(&peer_id)
.map_err(EndpointError::NatTraversal)?
.ok_or_else(|| {
EndpointError::Connection(
"connection lost lifecycle race with no live winner".to_string(),
)
})?;
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(live_connection.remote_address()),
traversal_method: TraversalMethod::Direct,
side: live_connection.side(),
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
return Ok(peer_conn);
}
self.inner.register_connection_peer_id(addr, peer_id);
self.inner
.record_bootstrap_direct_connection(peer_id, &addr, Some(connection.rtt()));
let reader_conn = connection.clone();
self.inner
.spawn_connection_handler(peer_id, connection, Side::Client)
.map_err(EndpointError::NatTraversal)?;
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
self.spawn_reader_task(peer_id, reader_conn).await;
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
if let crate::nat_traversal_api::ConnectionRegistrationOutcome::Live {
superseded_generation: Some(generation),
..
} = registration
{
self.cancel_reader_generation(&peer_id, generation).await;
}
publish_direct_path_status(
self.direct_path_statuses.as_ref(),
&self.event_tx,
peer_id,
DirectPathStatus::Established { remote_addr: addr },
);
Ok(peer_conn)
}
async fn start_hole_punch_session(
&self,
coordinator: SocketAddr,
peer_id: PeerId,
) -> Result<(), EndpointError> {
if !self.is_connected_to_addr(coordinator).await {
debug!("Connecting to coordinator {} first", coordinator);
self.connect_direct_addr(coordinator).await?;
}
self.inner
.initiate_nat_traversal(peer_id, coordinator)
.map_err(EndpointError::NatTraversal)
}
async fn await_hole_punch_outcome(
&self,
target: SocketAddr,
peer_id: PeerId,
overall_deadline: tokio::time::Instant,
) -> Result<PeerConnection, HolePunchAwaitError> {
loop {
if self.shutdown.is_cancelled() {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::Endpoint(EndpointError::ShuttingDown));
}
if tokio::time::Instant::now() >= overall_deadline {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::Endpoint(EndpointError::Timeout));
}
let events = self
.inner
.poll(Instant::now())
.map_err(HolePunchAwaitError::from_nat_traversal_error)?;
if let Some(rejection) = take_live_rejection(self.inner.local_peer_id(), peer_id) {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::TraversalFailure(
TraversalFailureReason::CoordinationRejected {
reason: rejection.reason,
},
));
}
let had_events = !events.is_empty();
for event in events {
info!(
"await_hole_punch_outcome polled event for target {:?}: {:?}",
peer_id, event
);
match event {
NatTraversalEvent::ConnectionEstablished {
peer_id: evt_peer,
remote_address,
side,
..
} if evt_peer == peer_id || remote_address == target => {
self.inner
.register_connection_peer_id(remote_address, evt_peer);
let peer_conn = PeerConnection {
peer_id: evt_peer,
remote_addr: TransportAddr::Udp(remote_address),
traversal_method: TraversalMethod::HolePunch,
side,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
if let Some(conn) = self
.inner
.get_connection_by_authenticated_peer(evt_peer)
.await
.or_else(|| self.inner.session_connection(evt_peer))
{
let endpoint = self.clone();
tokio::spawn(async move {
endpoint.spawn_reader_task(evt_peer, conn).await;
});
}
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Ok(peer_conn);
}
NatTraversalEvent::TraversalSucceeded {
peer_id: evt_peer, ..
} if evt_peer == peer_id => {
if let Some(conn) = self
.inner
.get_connection_by_authenticated_peer(peer_id)
.await
.or_else(|| self.inner.session_connection(peer_id))
{
let remote_address = conn.remote_address();
let side = conn.side();
self.inner
.register_connection_peer_id(remote_address, peer_id);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(remote_address),
traversal_method: TraversalMethod::HolePunch,
side,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let endpoint = self.clone();
tokio::spawn(async move {
endpoint.spawn_reader_task(peer_id, conn).await;
});
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Ok(peer_conn);
}
}
NatTraversalEvent::TraversalTerminated {
peer_id: evt_peer,
reason,
..
} if evt_peer == peer_id => {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::TraversalFailure(reason));
}
NatTraversalEvent::CoordinationRejected {
peer_id: evt_peer,
reason,
..
} if evt_peer == peer_id => {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::TraversalFailure(
TraversalFailureReason::CoordinationRejected { reason },
));
}
NatTraversalEvent::TraversalFailed {
peer_id: evt_peer,
error,
..
} if evt_peer == peer_id => {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::from_nat_traversal_error(error));
}
_ => {}
}
}
if let Some(conn) = self
.inner
.get_connection_by_authenticated_peer(peer_id)
.await
.or_else(|| self.inner.session_connection(peer_id))
{
info!(
"await_hole_punch_outcome observed existing inner connection for peer {:?}; finalizing",
peer_id
);
let remote_address = conn.remote_address();
let side = conn.side();
self.inner
.register_connection_peer_id(remote_address, peer_id);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(remote_address),
traversal_method: TraversalMethod::HolePunch,
side,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let endpoint = self.clone();
tokio::spawn(async move {
endpoint.spawn_reader_task(peer_id, conn).await;
});
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Ok(peer_conn);
}
if had_events {
continue;
}
match self
.wait_for_traversal_progress(peer_id, overall_deadline)
.await
{
Ok(()) => {}
Err(error) => {
let _ = clear_live_request(self.inner.local_peer_id(), peer_id);
return Err(HolePunchAwaitError::Endpoint(error));
}
}
}
}
async fn wait_for_traversal_progress(
&self,
peer_id: PeerId,
deadline: tokio::time::Instant,
) -> Result<(), EndpointError> {
let traversal_notified = self.inner.traversal_event_notify().notified();
tokio::pin!(traversal_notified);
traversal_notified.as_mut().enable();
let wake_at = self
.inner
.next_session_poll_deadline(peer_id, Instant::now())
.map(tokio::time::Instant::from_std)
.map(|next| next.min(deadline))
.unwrap_or(deadline);
tokio::select! {
_ = traversal_notified.as_mut() => Ok(()),
_ = tokio::time::sleep_until(wake_at) => {
if wake_at >= deadline {
Err(EndpointError::Timeout)
} else {
Ok(())
}
}
_ = self.shutdown.cancelled() => Err(EndpointError::ShuttingDown),
}
}
fn should_retry_hole_punch_reason(reason: &TraversalFailureReason) -> bool {
match reason {
TraversalFailureReason::CoordinatorUnavailable
| TraversalFailureReason::CoordinationExpired
| TraversalFailureReason::PunchWindowMissed
| TraversalFailureReason::ValidationTimedOut
| TraversalFailureReason::NetworkError(_) => true,
TraversalFailureReason::DiscoveryExhausted
| TraversalFailureReason::CoordinationRejected { .. }
| TraversalFailureReason::SynchronizationExpired
| TraversalFailureReason::ValidationFailed
| TraversalFailureReason::ConnectionFailed
| TraversalFailureReason::ProtocolViolation(_)
| TraversalFailureReason::ShuttingDown => false,
}
}
#[cfg(test)]
fn endpoint_error_from_traversal_failure(reason: TraversalFailureReason) -> EndpointError {
match reason {
TraversalFailureReason::CoordinatorUnavailable => {
EndpointError::NatTraversal(NatTraversalError::NoBootstrapNodes)
}
TraversalFailureReason::DiscoveryExhausted => {
EndpointError::NatTraversal(NatTraversalError::NoCandidatesFound)
}
TraversalFailureReason::CoordinationRejected { reason } => EndpointError::NatTraversal(
NatTraversalError::CoordinationFailed(format!("coordination rejected: {reason}")),
),
TraversalFailureReason::CoordinationExpired
| TraversalFailureReason::SynchronizationExpired
| TraversalFailureReason::PunchWindowMissed
| TraversalFailureReason::ValidationTimedOut => EndpointError::Timeout,
TraversalFailureReason::ValidationFailed => EndpointError::NatTraversal(
NatTraversalError::ValidationFailed("traversal validation failed".to_string()),
),
TraversalFailureReason::ConnectionFailed => EndpointError::NatTraversal(
NatTraversalError::ConnectionFailed("hole-punch connection failed".to_string()),
),
TraversalFailureReason::ProtocolViolation(message) => {
EndpointError::NatTraversal(NatTraversalError::ProtocolError(message))
}
TraversalFailureReason::NetworkError(message) => {
EndpointError::NatTraversal(NatTraversalError::NetworkError(message))
}
TraversalFailureReason::ShuttingDown => EndpointError::ShuttingDown,
}
}
async fn select_relay_target_addr(
&self,
peer_id: Option<PeerId>,
fallback_ipv4: Option<SocketAddr>,
fallback_ipv6: Option<SocketAddr>,
) -> Option<SocketAddr> {
let mut listener_addrs = Vec::new();
let mut reachable_addrs = Vec::new();
let mut external_addrs = Vec::new();
if let Some(peer_id) = peer_id {
if let Some(cached_peer) = self.bootstrap_cache.get_peer(&peer_id).await {
extend_unique_socket_addrs(&mut listener_addrs, cached_peer.addresses);
extend_unique_socket_addrs(
&mut reachable_addrs,
cached_peer
.capabilities
.reachable_addresses
.iter()
.map(|entry| entry.address),
);
extend_unique_socket_addrs(
&mut external_addrs,
cached_peer.capabilities.external_addresses,
);
}
if let Some(hints) = self.peer_hint_records.read().await.get(&peer_id).cloned() {
extend_unique_socket_addrs(&mut listener_addrs, hints.addrs);
extend_unique_socket_addrs(
&mut reachable_addrs,
hints
.capabilities
.reachable_addresses
.iter()
.map(|entry| entry.address),
);
extend_unique_socket_addrs(
&mut external_addrs,
hints.capabilities.external_addresses,
);
}
}
select_preferred_relay_target_addr(
&listener_addrs,
&reachable_addrs,
&external_addrs,
fallback_ipv4,
fallback_ipv6,
)
}
async fn try_relay_connection(
&self,
target: SocketAddr,
relay_addr: SocketAddr,
hint_peer_id: Option<PeerId>,
) -> Result<PeerConnection, EndpointError> {
info!(
"Attempting MASQUE relay connection to {} via {}",
target, relay_addr
);
let (public_addr, relay_endpoint) = self
.inner
.ensure_shared_relay_endpoint(relay_addr)
.await
.map_err(EndpointError::NatTraversal)?;
info!(
"MASQUE relay session established via {} (public addr: {:?})",
relay_addr, public_addr
);
let connecting = relay_endpoint.connect(target, "peer").map_err(|e| {
EndpointError::Connection(format!("Failed to initiate relay connection: {}", e))
})?;
let handshake_timeout = self
.config
.timeouts
.nat_traversal
.connection_establishment_timeout;
let connection = match timeout(handshake_timeout, connecting).await {
Ok(Ok(conn)) => conn,
Ok(Err(e)) => {
info!(
"Relay connection handshake to {} via {} failed: {}",
target, relay_addr, e
);
return Err(EndpointError::Connection(e.to_string()));
}
Err(_) => {
info!(
"Relay connection handshake to {} via {} timed out",
target, relay_addr
);
return Err(EndpointError::Timeout);
}
};
let relay_peer_id = self
.inner
.extract_peer_id_from_connection(&connection)
.await
.or(hint_peer_id)
.ok_or_else(|| {
EndpointError::Connection(
"Relay connection established without a durable peer identity".to_string(),
)
})?;
let registration = self
.inner
.add_connection_with_outcome(relay_peer_id, connection.clone())
.map_err(EndpointError::NatTraversal)?;
if matches!(
registration,
crate::nat_traversal_api::ConnectionRegistrationOutcome::Rejected { .. }
) {
if let Some(existing) = self
.connected_peers
.read()
.await
.get(&relay_peer_id)
.cloned()
{
return Ok(existing);
}
let live_connection = self
.inner
.get_connection(&relay_peer_id)
.map_err(EndpointError::NatTraversal)?
.ok_or_else(|| {
EndpointError::Connection(
"relay connection lost lifecycle race with no live winner".to_string(),
)
})?;
let peer_conn = PeerConnection {
peer_id: relay_peer_id,
remote_addr: TransportAddr::Udp(live_connection.remote_address()),
traversal_method: TraversalMethod::Relay,
side: live_connection.side(),
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
self.register_connected_peer(peer_conn.clone()).await;
return Ok(peer_conn);
}
self.inner
.register_connection_peer_id(target, relay_peer_id);
let reader_conn = connection.clone();
self.inner
.spawn_connection_handler(relay_peer_id, connection, Side::Client)
.map_err(EndpointError::NatTraversal)?;
let peer_conn = PeerConnection {
peer_id: relay_peer_id,
remote_addr: TransportAddr::Udp(target),
traversal_method: TraversalMethod::Relay,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
self.spawn_reader_task(relay_peer_id, reader_conn).await;
self.register_connected_peer(peer_conn.clone()).await;
if let crate::nat_traversal_api::ConnectionRegistrationOutcome::Live {
superseded_generation: Some(generation),
..
} = registration
{
self.cancel_reader_generation(&relay_peer_id, generation)
.await;
}
info!(
"MASQUE relay connection succeeded to {} via {}",
target, relay_addr
);
Ok(peer_conn)
}
async fn persist_direct_peer_reachability_if_applicable(
bootstrap_cache: &BootstrapCache,
peer_conn: &PeerConnection,
) {
if !peer_conn.traversal_method.is_direct() {
return;
}
if let Some(socket_addr) = peer_conn.remote_addr.as_socket_addr() {
bootstrap_cache
.observe_direct_reachability(peer_conn.peer_id, socket_addr)
.await;
}
}
fn observe_peer_reachability(&self, peer_conn: &PeerConnection) {
let cache = Arc::clone(&self.bootstrap_cache);
let peer_conn = peer_conn.clone();
tokio::spawn(async move {
Self::persist_direct_peer_reachability_if_applicable(cache.as_ref(), &peer_conn).await;
});
}
fn live_connection_snapshot(
&self,
peer_id: &PeerId,
) -> Option<crate::nat_traversal_api::ConnectionLifecycleSnapshot> {
self.inner
.get_connection(peer_id)
.ok()
.flatten()
.and_then(|connection| {
self.inner
.connection_snapshot_by_stable_id(peer_id, connection.stable_id())
})
}
fn emit_peer_lifecycle_event(&self, peer_id: PeerId, event: PeerLifecycleEvent) {
emit_peer_lifecycle_event(
&self.peer_event_tx,
self.peer_event_channels.as_ref(),
peer_id,
event,
);
}
fn next_ack_request_tag(&self, stable_id: usize) -> [u8; 16] {
loop {
let mut tag = [0u8; 16];
rand::thread_rng().fill_bytes(&mut tag);
let exists = self
.ack_waiters
.read()
.get(&stable_id)
.is_some_and(|entry| entry.contains_key(&tag));
if !exists {
return tag;
}
}
}
async fn send_ack_control_frame(
connection: crate::high_level::Connection,
tag: [u8; 16],
outcome: AckControlOutcome,
) {
let bytes = encode_ack_control(tag, outcome);
match connection.open_uni().await {
Ok(mut stream) => {
if let Err(error) = stream.write_all(&bytes).await {
warn!(error = %error, "failed to send ACK control frame");
return;
}
if let Err(error) = stream.finish() {
warn!(error = %error, "failed to finish ACK control frame stream");
}
}
Err(error) => {
warn!(error = %error, "failed to open ACK control stream");
}
}
}
async fn register_connected_peer(&self, peer_conn: PeerConnection) {
store_connected_peer(
self.connected_peers.as_ref(),
self.stats.as_ref(),
&self.event_tx,
peer_conn.clone(),
)
.await;
if let Some(snapshot) = self.live_connection_snapshot(&peer_conn.peer_id) {
let lifecycle_events = {
let mut generations = self.peer_event_generations.write();
match generations.insert(peer_conn.peer_id, snapshot.generation) {
None => vec![PeerLifecycleEvent::Established {
generation: snapshot.generation,
}],
Some(previous_generation) if previous_generation != snapshot.generation => {
vec![
PeerLifecycleEvent::Replaced {
old_generation: previous_generation,
new_generation: snapshot.generation,
},
PeerLifecycleEvent::Closing {
generation: previous_generation,
reason: ConnectionCloseReason::Superseded,
},
PeerLifecycleEvent::Closed {
generation: previous_generation,
reason: ConnectionCloseReason::Superseded,
},
]
}
Some(_) => Vec::new(),
}
};
for event in lifecycle_events {
self.emit_peer_lifecycle_event(peer_conn.peer_id, event);
}
}
if peer_conn.remote_addr.as_socket_addr().is_some() {
let _ = self.inner.publish_active_relay_to_peer(peer_conn.peer_id);
}
}
async fn is_connected_to_addr(&self, addr: SocketAddr) -> bool {
let transport_addr = TransportAddr::Udp(addr);
let peers = self.connected_peers.read().await;
peers.values().any(|p| p.remote_addr == transport_addr)
}
pub async fn accept(&self) -> Option<PeerConnection> {
if self.shutdown.is_cancelled() {
return None;
}
let result = tokio::select! {
r = self.inner.accept_connection() => r,
_ = self.shutdown.cancelled() => return None,
};
match result {
Ok((peer_id, connection)) => {
let remote_addr = connection.remote_address();
let mut resolved_peer_id = peer_id;
let mut registration = None;
if let Some(actual_peer_id) = self
.inner
.extract_peer_id_from_connection(&connection)
.await
{
if actual_peer_id != peer_id {
let _ = self.inner.remove_connection(&peer_id);
match self
.inner
.add_connection_with_outcome(actual_peer_id, connection.clone())
.map_err(EndpointError::NatTraversal)
{
Ok(outcome) => {
if matches!(
outcome,
crate::nat_traversal_api::ConnectionRegistrationOutcome::Rejected { .. }
) {
return None;
}
registration = Some(outcome);
resolved_peer_id = actual_peer_id;
}
Err(e) => {
error!("Failed to register re-keyed inbound connection: {}", e);
return None;
}
}
}
}
self.inner
.register_connection_peer_id(remote_addr, resolved_peer_id);
let reader_conn = connection.clone();
if let Err(e) =
self.inner
.spawn_connection_handler(resolved_peer_id, connection, Side::Server)
{
error!("Failed to spawn connection handler: {}", e);
return None;
}
let peer_conn = PeerConnection {
peer_id: resolved_peer_id,
remote_addr: TransportAddr::Udp(remote_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true, connected_at: Instant::now(),
last_activity: Instant::now(),
};
self.spawn_reader_task(resolved_peer_id, reader_conn).await;
self.observe_peer_reachability(&peer_conn);
self.register_connected_peer(peer_conn.clone()).await;
if let Some(crate::nat_traversal_api::ConnectionRegistrationOutcome::Live {
superseded_generation: Some(generation),
..
}) = registration
{
self.cancel_reader_generation(&resolved_peer_id, generation)
.await;
}
Some(peer_conn)
}
Err(e) => {
debug!("Accept failed: {}", e);
None
}
}
}
async fn cleanup_connection(&self, peer_id: &PeerId, reason: DisconnectReason) {
let close_reason = close_reason_for_disconnect(&reason);
do_cleanup_connection(
&*self.connected_peers,
&*self.inner,
&*self.reader_handles,
&*self.direct_path_statuses,
&*self.stats,
&self.event_tx,
&self.peer_event_tx,
self.peer_event_channels.as_ref(),
self.peer_event_generations.as_ref(),
self.ack_waiters.as_ref(),
peer_id,
reason,
close_reason,
)
.await;
}
pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), EndpointError> {
if self.connected_peers.read().await.contains_key(peer_id) {
self.cleanup_connection(peer_id, DisconnectReason::Normal)
.await;
Ok(())
} else {
Err(EndpointError::PeerNotFound(*peer_id))
}
}
pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), EndpointError> {
let peer_id_for_log = *peer_id;
let result: Result<(), EndpointError> = async {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let transport_addr = {
let peer_info = self.connected_peers.read().await;
if let Some(conn) = peer_info.get(peer_id) {
conn.remote_addr.clone()
} else if let Some(connection) = self
.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)?
{
TransportAddr::Udp(connection.remote_address())
} else {
return Err(EndpointError::PeerNotFound(*peer_id));
}
};
let engine = {
let mut router = self.router.write().await;
router.select_engine_for_addr(&transport_addr)
};
match engine {
crate::transport::ProtocolEngine::Quic => {
let connection = self
.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)?
.ok_or(EndpointError::PeerNotFound(*peer_id))?;
if let Some(reason) = close_reason_from_connection(&connection) {
return Err(EndpointError::ConnectionClosed { reason });
}
let mut send_stream = connection
.open_uni()
.await
.map_err(endpoint_error_from_connection_error)?;
send_stream
.write_all(data)
.await
.map_err(endpoint_error_from_write_error)?;
send_stream.finish().map_err(|e| {
close_reason_from_connection(&connection)
.map(|reason| EndpointError::ConnectionClosed { reason })
.unwrap_or_else(|| EndpointError::Connection(e.to_string()))
})?;
debug!("Sent {} bytes to peer {:?} via QUIC", data.len(), peer_id);
}
crate::transport::ProtocolEngine::Constrained => {
let maybe_conn_id = self
.constrained_connections
.read()
.await
.get(peer_id)
.copied();
if let Some(conn_id) = maybe_conn_id {
let engine = self.inner.constrained_engine();
let responses = {
let mut engine = engine.lock();
engine
.send(conn_id, data)
.map_err(|e| EndpointError::Connection(e.to_string()))?
};
for (_dest_addr, packet_data) in responses {
self.transport_registry
.send(&packet_data, &transport_addr)
.await
.map_err(|e| EndpointError::Connection(e.to_string()))?;
}
debug!(
"Sent {} bytes to peer {:?} via constrained engine ({})",
data.len(),
peer_id,
transport_addr.transport_type()
);
} else {
self.transport_registry
.send(data, &transport_addr)
.await
.map_err(|e| EndpointError::Connection(e.to_string()))?;
debug!(
"Sent {} bytes to peer {:?} via constrained transport (direct, {})",
data.len(),
peer_id,
transport_addr.transport_type()
);
}
}
}
let now = Instant::now();
note_peer_activity(
&self.connected_peers,
&self.peer_activity,
*peer_id,
PeerActivityKind::Sent,
now,
)
.await;
Ok(())
}
.await;
if let Err(ref e) = result {
tracing::warn!(
target: "ant_quic::send_error",
peer_id = ?peer_id_for_log,
error = %e,
"send failed"
);
}
result
}
pub async fn send_with_receive_ack(
&self,
peer_id: &PeerId,
data: &[u8],
timeout_duration: Duration,
) -> Result<(), EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let transport_addr = {
let peer_info = self.connected_peers.read().await;
if let Some(conn) = peer_info.get(peer_id) {
conn.remote_addr.clone()
} else if let Some(connection) = self
.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)?
{
TransportAddr::Udp(connection.remote_address())
} else {
return Err(EndpointError::PeerNotFound(*peer_id));
}
};
let engine = {
let mut router = self.router.write().await;
router.select_engine_for_addr(&transport_addr)
};
if !matches!(engine, crate::transport::ProtocolEngine::Quic) {
return Err(EndpointError::NotSupported);
}
let connection = self
.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)?
.ok_or(EndpointError::PeerNotFound(*peer_id))?;
if !connection.supports_ack_receive_v1() {
return Err(EndpointError::NotSupported);
}
if let Some(reason) = close_reason_from_connection(&connection) {
return Err(EndpointError::ConnectionClosed { reason });
}
let stable_id = connection.stable_id();
let tag = self.next_ack_request_tag(stable_id);
let (tx, rx) = oneshot::channel();
let inserted = register_ack_waiter(self.ack_waiters.as_ref(), stable_id, tag, tx);
if !inserted {
return Err(EndpointError::Connection(
"failed to reserve unique ACK request tag".to_string(),
));
}
let envelope = encode_ack_payload(tag, data);
let send_result = async {
let mut send_stream = connection
.open_uni()
.await
.map_err(endpoint_error_from_connection_error)?;
send_stream
.write_all(&envelope)
.await
.map_err(endpoint_error_from_write_error)?;
send_stream.finish().map_err(|e| {
close_reason_from_connection(&connection)
.map(|reason| EndpointError::ConnectionClosed { reason })
.unwrap_or_else(|| EndpointError::Connection(e.to_string()))
})
}
.await;
if let Err(error) = send_result {
if !resolve_ack_waiter(
self.ack_waiters.as_ref(),
stable_id,
tag,
AckWaiterResult::Closed(ConnectionCloseReason::LocallyClosed),
) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "resolve_ack_waiter_miss",
stable_id = stable_id,
"no waiter for tag"
);
}
return Err(error);
}
note_peer_activity(
&self.connected_peers,
&self.peer_activity,
*peer_id,
PeerActivityKind::Sent,
Instant::now(),
)
.await;
match timeout(timeout_duration, rx).await {
Ok(Ok(AckWaiterResult::Accepted)) => Ok(()),
Ok(Ok(AckWaiterResult::Rejected(reason))) => {
Err(EndpointError::ReceiveRejected { reason })
}
Ok(Ok(AckWaiterResult::Closed(reason))) => {
Err(EndpointError::ConnectionClosed { reason })
}
Ok(Err(_)) => {
if let Some(reason) = close_reason_from_connection(&connection) {
Err(EndpointError::ConnectionClosed { reason })
} else {
Err(EndpointError::Connection(
"ACK waiter dropped before completion".to_string(),
))
}
}
Err(_) => {
if !resolve_ack_waiter(
self.ack_waiters.as_ref(),
stable_id,
tag,
AckWaiterResult::Closed(ConnectionCloseReason::TimedOut),
) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "resolve_ack_waiter_miss",
stable_id = stable_id,
"no waiter for tag"
);
}
Err(EndpointError::AckTimeout)
}
}
}
pub async fn probe_peer(
&self,
peer_id: &PeerId,
timeout_duration: Duration,
) -> Result<Duration, EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
let transport_addr = {
let peer_info = self.connected_peers.read().await;
if let Some(conn) = peer_info.get(peer_id) {
conn.remote_addr.clone()
} else if let Some(connection) = self
.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)?
{
TransportAddr::Udp(connection.remote_address())
} else {
return Err(EndpointError::PeerNotFound(*peer_id));
}
};
let engine = {
let mut router = self.router.write().await;
router.select_engine_for_addr(&transport_addr)
};
if !matches!(engine, crate::transport::ProtocolEngine::Quic) {
return Err(EndpointError::NotSupported);
}
let connection = self
.inner
.get_connection(peer_id)
.map_err(EndpointError::NatTraversal)?
.ok_or(EndpointError::PeerNotFound(*peer_id))?;
if !connection.supports_ack_receive_v1() {
return Err(EndpointError::NotSupported);
}
if let Some(reason) = close_reason_from_connection(&connection) {
return Err(EndpointError::ConnectionClosed { reason });
}
let stable_id = connection.stable_id();
let tag = self.next_ack_request_tag(stable_id);
let (tx, rx) = oneshot::channel();
let inserted = register_ack_waiter(self.ack_waiters.as_ref(), stable_id, tag, tx);
if !inserted {
return Err(EndpointError::Connection(
"failed to reserve unique probe tag".to_string(),
));
}
let envelope = encode_probe_request(tag);
let sent_at = Instant::now();
let send_result = async {
let mut send_stream = connection
.open_uni()
.await
.map_err(endpoint_error_from_connection_error)?;
send_stream
.write_all(&envelope)
.await
.map_err(endpoint_error_from_write_error)?;
send_stream.finish().map_err(|e| {
close_reason_from_connection(&connection)
.map(|reason| EndpointError::ConnectionClosed { reason })
.unwrap_or_else(|| EndpointError::Connection(e.to_string()))
})
}
.await;
if let Err(error) = send_result {
if !resolve_ack_waiter(
self.ack_waiters.as_ref(),
stable_id,
tag,
AckWaiterResult::Closed(ConnectionCloseReason::LocallyClosed),
) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "resolve_ack_waiter_miss",
stable_id = stable_id,
"no waiter for tag"
);
}
return Err(error);
}
note_peer_activity(
&self.connected_peers,
&self.peer_activity,
*peer_id,
PeerActivityKind::Sent,
sent_at,
)
.await;
match timeout(timeout_duration, rx).await {
Ok(Ok(AckWaiterResult::Accepted)) => Ok(sent_at.elapsed()),
Ok(Ok(AckWaiterResult::Rejected(reason))) => {
Err(EndpointError::ReceiveRejected { reason })
}
Ok(Ok(AckWaiterResult::Closed(reason))) => {
Err(EndpointError::ConnectionClosed { reason })
}
Ok(Err(_)) => {
if let Some(reason) = close_reason_from_connection(&connection) {
Err(EndpointError::ConnectionClosed { reason })
} else {
Err(EndpointError::Connection(
"probe waiter dropped before completion".to_string(),
))
}
}
Err(_) => {
if !resolve_ack_waiter(
self.ack_waiters.as_ref(),
stable_id,
tag,
AckWaiterResult::Closed(ConnectionCloseReason::TimedOut),
) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "resolve_ack_waiter_miss",
stable_id = stable_id,
"no waiter for tag"
);
}
Err(EndpointError::ProbeTimeout)
}
}
}
pub async fn recv(&self) -> Result<(PeerId, Vec<u8>), EndpointError> {
if self.shutdown.is_cancelled() {
return Err(EndpointError::ShuttingDown);
}
{
let mut pending = self.pending_data.write().await;
pending.cleanup_expired();
if let Some((peer_id, data)) = pending.pop_any() {
let data_len = data.len();
tracing::trace!(
"Received {} bytes from peer {:?} (from pending buffer)",
data_len,
peer_id
);
let now = Instant::now();
note_peer_activity(
&self.connected_peers,
&self.peer_activity,
peer_id,
PeerActivityKind::Received,
now,
)
.await;
if let Err(e) = self.event_tx.send(P2pEvent::DataReceived {
peer_id,
bytes: data_len,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_data_received",
peer_id = ?peer_id,
bytes = data_len,
error = %e,
"HIGH: silent drop"
);
}
return Ok((peer_id, data));
}
}
let mut rx = self.data_rx.lock().await;
tokio::select! {
msg = rx.recv() => match msg {
Some(msg) => Ok(msg),
None => Err(EndpointError::ShuttingDown),
},
_ = self.shutdown.cancelled() => Err(EndpointError::ShuttingDown),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<P2pEvent> {
self.event_tx.subscribe()
}
pub fn subscribe_peer_events(
&self,
peer_id: &PeerId,
) -> broadcast::Receiver<PeerLifecycleEvent> {
peer_event_sender(self.peer_event_channels.as_ref(), *peer_id).subscribe()
}
pub fn subscribe_all_peer_events(&self) -> broadcast::Receiver<(PeerId, PeerLifecycleEvent)> {
self.peer_event_tx.subscribe()
}
pub async fn stats(&self) -> EndpointStats {
self.stats.read().await.clone()
}
pub async fn connection_metrics(&self, peer_id: &PeerId) -> Option<ConnectionMetrics> {
let connection = self.inner.get_connection(peer_id).ok()??;
let stats = connection.stats();
let rtt = connection.rtt();
let last_activity = self
.connected_peers
.read()
.await
.get(peer_id)
.map(|p| p.last_activity);
Some(ConnectionMetrics {
bytes_sent: stats.udp_tx.bytes,
bytes_received: stats.udp_rx.bytes,
rtt: Some(rtt),
packet_loss: stats.path.lost_packets as f64
/ (stats.path.sent_packets + stats.path.lost_packets).max(1) as f64,
last_activity,
})
}
pub async fn connection_health(&self, peer_id: &PeerId) -> ConnectionHealth {
let live_snapshot =
self.inner
.get_connection(peer_id)
.ok()
.flatten()
.and_then(|connection| {
self.inner
.connection_snapshot_by_stable_id(peer_id, connection.stable_id())
});
let constrained_connected = self
.constrained_connections
.read()
.await
.contains_key(peer_id);
let reader_task_active = if live_snapshot.is_some() {
Some(
self.reader_handles
.read()
.await
.get(peer_id)
.is_some_and(|handles| !handles.is_empty()),
)
} else if constrained_connected {
Some(false)
} else {
None
};
let activity = self
.peer_activity
.read()
.await
.get(peer_id)
.copied()
.unwrap_or_default();
ConnectionHealth::from_observation(
ConnectionHealthObservation {
connected: live_snapshot.is_some() || constrained_connected,
generation: live_snapshot.map(|snapshot| snapshot.generation),
reader_task_active,
last_received_at: activity.last_received_at,
last_sent_at: activity.last_sent_at,
close_reason: if live_snapshot.is_none() && !constrained_connected {
self.inner.recent_close_reason_for_peer(peer_id)
} else {
None
},
},
Instant::now(),
)
}
pub async fn connect_known_peers(&self) -> Result<usize, EndpointError> {
let mut connected = 0;
let directory = self.peer_directory_snapshot().await;
let static_known_peers = if self.config.discovery.static_known_peers {
self.config.known_peers.clone()
} else {
Vec::new()
};
let manual_udp_known_peers = directory
.locator_claims()
.filter(|record| {
record
.sources
.contains(&PeerDiscoverySource::ManualKnownPeer)
})
.flat_map(|record| record.addresses.clone())
.collect::<Vec<_>>();
let runtime_udp_known_peers = directory
.locator_claims()
.filter(|record| {
record
.sources
.contains(&PeerDiscoverySource::RuntimeKnownPeer)
})
.flat_map(|record| record.addresses.clone())
.collect::<Vec<_>>();
let auto_runtime_udp_known_peers =
if self.config.discovery.auto_connect.allows_automatic_dial() {
runtime_udp_known_peers
.iter()
.copied()
.filter(|addr| !manual_udp_known_peers.contains(addr))
.collect::<Vec<_>>()
} else {
Vec::new()
};
let mdns_discovered_peers = directory
.locator_claims()
.filter(|record| record.sources.contains(&PeerDiscoverySource::Mdns))
.filter_map(|record| record.mdns_peer.clone())
.collect::<Vec<_>>();
let mut connected_udp_addrs = std::collections::HashSet::new();
for addr in &static_known_peers {
match self.connect_transport(addr, None).await {
Ok(_) => {
connected += 1;
if let Some(socket_addr) = addr.as_socket_addr() {
connected_udp_addrs.insert(socket_addr);
}
info!("Connected to known peer {}", addr);
}
Err(e) => {
warn!("Failed to connect to known peer {}: {}", addr, e);
}
}
}
for addr in &manual_udp_known_peers {
if connected_udp_addrs.contains(addr) {
continue;
}
match self.connect_addr(*addr).await {
Ok(_) => {
connected += 1;
connected_udp_addrs.insert(*addr);
info!("Connected to manual known peer {}", addr);
}
Err(e) => {
warn!("Failed to connect to manual known peer {}: {}", addr, e);
}
}
}
for addr in &auto_runtime_udp_known_peers {
if connected_udp_addrs.contains(addr) {
continue;
}
match self.connect_addr(*addr).await {
Ok(_) => {
connected += 1;
connected_udp_addrs.insert(*addr);
info!("Connected to runtime known peer {}", addr);
}
Err(e) => {
warn!("Failed to connect to runtime known peer {}: {}", addr, e);
}
}
}
for peer in &mdns_discovered_peers {
if peer
.addresses
.iter()
.all(|addr| connected_udp_addrs.contains(addr))
{
continue;
}
let mdns_policy = self
.config
.discovery
.mdns
.as_ref()
.map(|mdns| mdns.auto_connect)
.unwrap_or(AutoConnectPolicy::Disabled);
if !mdns_policy.allows_automatic_dial() {
if mdns_policy.requires_approval() {
if let Err(e) = self.event_tx.send(P2pEvent::MdnsPeerApprovalRequired {
peer: peer.clone(),
reason: "approval required by discovery policy".to_string(),
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_approval_required", error = %e, "silent drop");
}
}
continue;
}
if let Err(reason) = self.discovered_peer_allowed(peer.claimed_peer_id) {
if let Err(e) = self.event_tx.send(P2pEvent::MdnsPeerIneligible {
peer: peer.clone(),
reason,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_ineligible", error = %e, "silent drop");
}
continue;
}
match self
.connect_orchestrated(peer.claimed_peer_id, peer.addresses.clone())
.await
{
Ok(_) => {
connected += 1;
for addr in &peer.addresses {
connected_udp_addrs.insert(*addr);
}
info!(
fullname = %peer.fullname,
addresses = ?peer.addresses,
"Connected to eligible mDNS-discovered peer"
);
}
Err(error) => {
warn!(
fullname = %peer.fullname,
addresses = ?peer.addresses,
error = %error,
"Failed to connect to eligible mDNS-discovered peer"
);
}
}
}
{
let mut stats = self.stats.write().await;
stats.connected_bootstrap_nodes = connected;
}
let total = static_known_peers.len()
+ manual_udp_known_peers
.iter()
.filter(|addr| {
!static_known_peers
.iter()
.filter_map(|known| known.as_socket_addr())
.any(|known| known == **addr)
})
.count()
+ auto_runtime_udp_known_peers
.iter()
.filter(|addr| {
!static_known_peers
.iter()
.filter_map(|known| known.as_socket_addr())
.any(|known| known == **addr)
&& !manual_udp_known_peers.contains(addr)
})
.count()
+ mdns_discovered_peers.len();
let _ = self
.event_tx
.send(P2pEvent::BootstrapStatus { connected, total });
Ok(connected)
}
pub async fn add_known_peer(&self, addr: SocketAddr) {
self.add_bootstrap(addr).await;
}
pub async fn add_bootstrap(&self, addr: SocketAddr) {
let _ = self.inner.add_bootstrap_node(addr);
{
let mut manual = self.manual_known_peer_udp_addrs.write().await;
if !manual.contains(&addr) {
manual.push(addr);
}
}
let mut stats = self.stats.write().await;
stats.total_bootstrap_nodes += 1;
}
pub async fn connected_peers(&self) -> Vec<PeerConnection> {
self.connected_peers
.read()
.await
.values()
.cloned()
.collect()
}
pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
self.connected_peers.read().await.contains_key(peer_id)
}
pub async fn is_authenticated(&self, peer_id: &PeerId) -> bool {
self.connected_peers
.read()
.await
.get(peer_id)
.map(|p| p.authenticated)
.unwrap_or(false)
}
pub async fn shutdown(&self) {
info!("Shutting down P2P endpoint");
self.shutdown.cancel();
let handles = {
let mut handles = self.reader_handles.write().await;
std::mem::take(&mut *handles)
};
for entries in handles.into_values() {
for handle in entries {
handle.cancel.cancel();
handle.abort_handle.abort();
}
}
let peers: Vec<PeerId> = self.connected_peers.read().await.keys().copied().collect();
for peer_id in peers {
let _ = self.disconnect(&peer_id).await;
}
match timeout(SHUTDOWN_DRAIN_TIMEOUT, self.inner.shutdown()).await {
Err(_) => warn!("Inner endpoint shutdown timed out, proceeding"),
Ok(Err(e)) => warn!("Inner endpoint shutdown error: {e}"),
Ok(Ok(())) => {}
}
}
pub fn is_running(&self) -> bool {
!self.shutdown.is_cancelled()
}
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown.clone()
}
fn spawn_proactive_relay_manager(&self) {
if !self.config.nat.enable_relay_fallback {
return;
}
let endpoint = self.clone();
let mut events = self.subscribe();
tokio::spawn(async move {
loop {
let event = tokio::select! {
_ = endpoint.shutdown.cancelled() => return,
event = events.recv() => match event {
Ok(event) => event,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
},
};
if !matches!(
event,
P2pEvent::ExternalAddressDiscovered { .. }
| P2pEvent::PeerConnected { .. }
| P2pEvent::BootstrapStatus { .. }
) {
continue;
}
if endpoint.inner.relay_public_addr().is_some() {
if endpoint.inner.is_relay_healthy() {
continue;
}
endpoint.inner.reset_relay_state();
}
if !endpoint.inner.is_symmetric_nat() {
continue;
}
let relay_candidates = endpoint.runtime_known_peer_udp_addrs();
if relay_candidates.is_empty() {
debug!("Symmetric NAT detected but no relay candidates are currently known");
continue;
}
info!(
candidate_count = relay_candidates.len(),
"Symmetric NAT detected — attempting proactive relay immediately"
);
let mut established = None;
for bootstrap in relay_candidates {
match endpoint.inner.setup_proactive_relay(bootstrap).await {
Ok(relay_addr) => {
info!(
"Proactive relay active at {} via bootstrap {}",
relay_addr, bootstrap
);
established = Some(relay_addr);
break;
}
Err(error) => {
warn!("Failed to set up relay via {}: {}", bootstrap, error);
}
}
}
if let Some(relay_addr) = established {
let _ = endpoint
.event_tx
.send(P2pEvent::RelayEstablished { relay_addr });
}
}
});
}
fn spawn_port_mapping_task(&self) {
if !self.config.nat.port_mapping.enabled {
info!("Best-effort router port mapping disabled by configuration");
return;
}
let Some(local_addr) = self.local_addr() else {
warn!(
"Skipping best-effort router port mapping because local bind address is unavailable"
);
return;
};
let endpoint = self.clone();
spawn_best_effort_port_mapping(
self.config.nat.port_mapping,
local_addr.port(),
self.shutdown.clone(),
move |event| endpoint.apply_port_mapping_event(event),
);
}
fn mdns_auto_connect_enabled(&self) -> bool {
self.config
.discovery
.mdns
.as_ref()
.is_some_and(|mdns| mdns.enabled && mdns.auto_connect.allows_automatic_dial())
}
fn spawn_mdns_task(&self) {
let Some(mdns) = self.config.discovery.mdns.clone() else {
return;
};
if !mdns.enabled {
return;
}
let Some(local_addr) = self.local_addr() else {
warn!("Skipping first-party mDNS because local bind address is unavailable");
return;
};
let configured_loopback_only = self
.config
.bind_addr
.as_ref()
.and_then(TransportAddr::as_socket_addr)
.is_some_and(|configured| configured.ip().is_loopback());
if configured_loopback_only || local_addr.ip().is_loopback() {
info!(
configured_loopback_only,
local_addr = %local_addr,
"Skipping first-party mDNS for a loopback-only endpoint"
);
return;
}
{
let mut snapshot = self.mdns_state.write();
snapshot.browsing = mdns.mode.browse_enabled();
snapshot.service = mdns.service.clone();
snapshot.namespace = mdns.namespace.clone();
}
let endpoint = self.clone();
spawn_mdns_runtime(
mdns,
self.peer_id,
local_addr.port(),
self.shutdown.clone(),
move |event| endpoint.apply_mdns_runtime_event(event),
);
}
fn apply_mdns_runtime_event(&self, event: MdnsRuntimeEvent) {
match event {
MdnsRuntimeEvent::ServiceAdvertised {
service,
namespace,
instance_fullname,
} => {
{
let mut snapshot = self.mdns_state.write();
snapshot.advertising = true;
snapshot.service = Some(service.clone());
snapshot.namespace = namespace.clone();
snapshot.advertised_instance_fullname = Some(instance_fullname.clone());
}
if let Err(e) = self.event_tx.send(P2pEvent::MdnsServiceAdvertised {
service,
namespace,
instance_fullname,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_service_advertised", error = %e, "silent drop");
}
}
MdnsRuntimeEvent::PeerDiscovered(peer) => {
self.upsert_mdns_peer(&peer);
if let Err(e) = self.event_tx.send(P2pEvent::MdnsPeerDiscovered { peer }) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_peer_discovered", error = %e, "silent drop");
}
}
MdnsRuntimeEvent::PeerUpdated(peer) => {
self.upsert_mdns_peer(&peer);
if let Err(e) = self.event_tx.send(P2pEvent::MdnsPeerUpdated { peer }) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_peer_updated", error = %e, "silent drop");
}
}
MdnsRuntimeEvent::PeerRemoved(peer) => {
self.remove_mdns_peer(&peer.fullname);
if let Err(e) = self.event_tx.send(P2pEvent::MdnsPeerRemoved { peer }) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_peer_removed", error = %e, "silent drop");
}
}
MdnsRuntimeEvent::PeerEligible(peer) => {
self.upsert_mdns_peer(&peer);
let _ = self
.event_tx
.send(P2pEvent::MdnsPeerEligible { peer: peer.clone() });
let mdns_policy = self
.config
.discovery
.mdns
.as_ref()
.map(|mdns| mdns.auto_connect)
.unwrap_or(AutoConnectPolicy::Disabled);
if mdns_policy.requires_approval() {
if let Err(e) = self.event_tx.send(P2pEvent::MdnsPeerApprovalRequired {
peer,
reason: "approval required by discovery policy".to_string(),
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_approval_required", error = %e, "silent drop");
}
} else if self.mdns_auto_connect_enabled() {
self.schedule_mdns_auto_connect(peer);
}
}
MdnsRuntimeEvent::PeerIneligible { peer, reason } => {
self.remove_mdns_peer(&peer.fullname);
let _ = self
.event_tx
.send(P2pEvent::MdnsPeerIneligible { peer, reason });
}
}
}
fn upsert_mdns_peer(&self, peer: &MdnsPeerRecord) {
let mut snapshot = self.mdns_state.write();
if let Some(existing) = snapshot
.discovered_peers
.iter_mut()
.find(|existing| existing.fullname == peer.fullname)
{
*existing = peer.clone();
} else {
snapshot.discovered_peers.push(peer.clone());
snapshot
.discovered_peers
.sort_by(|left, right| left.fullname.cmp(&right.fullname));
}
}
fn remove_mdns_peer(&self, fullname: &str) {
let mut snapshot = self.mdns_state.write();
snapshot
.discovered_peers
.retain(|peer| peer.fullname != fullname);
}
fn schedule_mdns_auto_connect(&self, peer: MdnsPeerRecord) {
if peer.addresses.is_empty() {
return;
}
if let Err(reason) = self.discovered_peer_allowed(peer.claimed_peer_id) {
let _ = self
.event_tx
.send(P2pEvent::MdnsPeerIneligible { peer, reason });
return;
}
{
let mut inflight = self.mdns_auto_connect_inflight.write();
if !inflight.insert(peer.fullname.clone()) {
return;
}
}
let endpoint = self.clone();
tokio::spawn(async move {
let fullname = peer.fullname.clone();
let addresses = peer.addresses.clone();
if endpoint
.find_live_connection_for_addrs(&addresses)
.await
.is_none()
{
if let Err(e) = endpoint.event_tx.send(P2pEvent::MdnsAutoConnectAttempted {
peer: peer.clone(),
addresses: addresses.clone(),
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_auto_connect_attempted", error = %e, "silent drop");
}
match endpoint.connect_orchestrated(None, addresses.clone()).await {
Ok(connection) => {
if let Err(e) = endpoint.event_tx.send(P2pEvent::MdnsAutoConnectSucceeded {
peer,
authenticated_peer_id: connection.peer_id,
remote_addr: connection.remote_addr,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_auto_connect_succeeded", error = %e, "silent drop");
}
}
Err(error) => {
if let Err(e) = endpoint.event_tx.send(P2pEvent::MdnsAutoConnectFailed {
peer,
addresses,
error: error.to_string(),
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_mdns_auto_connect_failed", error = %e, "silent drop");
}
}
}
}
endpoint
.mdns_auto_connect_inflight
.write()
.remove(&fullname);
});
}
fn apply_port_mapping_event(&self, event: PortMappingEvent) {
match event {
PortMappingEvent::Established { snapshot } => {
self.apply_port_mapping_snapshot(snapshot);
if let Some(mapped_addr) = snapshot.external_addr {
if let Err(e) = self.event_tx.send(P2pEvent::PortMappingEstablished {
external_addr: mapped_addr,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_port_mapping_established", error = %e, "silent drop");
}
if let Err(e) = self.event_tx.send(P2pEvent::ExternalAddressDiscovered {
addr: TransportAddr::Udp(mapped_addr),
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_external_addr", error = %e, "silent drop");
}
}
}
PortMappingEvent::Renewed { snapshot } => {
self.apply_port_mapping_snapshot(snapshot);
if let Some(mapped_addr) = snapshot.external_addr {
if let Err(e) = self.event_tx.send(P2pEvent::PortMappingRenewed {
external_addr: mapped_addr,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_port_mapping_renewed", error = %e, "silent drop");
}
if let Err(e) = self.event_tx.send(P2pEvent::ExternalAddressDiscovered {
addr: TransportAddr::Udp(mapped_addr),
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_external_addr", error = %e, "silent drop");
}
}
}
PortMappingEvent::Failed { error } => {
if let Err(e) = self.event_tx.send(P2pEvent::PortMappingFailed { error }) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_port_mapping_failed", error = %e, "silent drop");
}
}
PortMappingEvent::Removed { external_addr } => {
self.apply_port_mapping_snapshot(PortMappingSnapshot::default());
if let Err(e) = self
.event_tx
.send(P2pEvent::PortMappingRemoved { external_addr })
{
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_port_mapping_removed", error = %e, "silent drop");
}
}
}
}
fn apply_port_mapping_snapshot(&self, snapshot: PortMappingSnapshot) {
let previous_addr = {
let mut current = self.port_mapping_state.write();
let previous = current.external_addr;
*current = snapshot;
previous
};
self.inner
.reconcile_relay_server_public_addresses(snapshot.external_addr);
if let Some(previous_addr) = previous_addr
&& snapshot.external_addr != Some(previous_addr)
{
let _ = self.inner.remove_local_external_candidate(previous_addr);
if let Some(mapped_addr) = snapshot.external_addr {
if let Err(e) = self.event_tx.send(P2pEvent::PortMappingAddressChanged {
previous_addr,
external_addr: mapped_addr,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_port_mapping_addr_changed", error = %e, "silent drop");
}
}
}
if snapshot.active
&& let Some(mapped_addr) = snapshot.external_addr
&& let Err(error) = self.inner.add_local_external_candidate(mapped_addr)
{
warn!(
error = %error,
mapped_addr = %mapped_addr,
"Failed to add router-mapped address to the NAT candidate set"
);
}
}
async fn cancel_reader_generation(&self, peer_id: &PeerId, generation: u64) {
let handles = self.reader_handles.read().await;
if let Some(handle) = handles
.get(peer_id)
.and_then(|entries| entries.iter().find(|entry| entry.generation == generation))
{
handle.cancel.cancel();
}
}
async fn spawn_reader_task(&self, peer_id: PeerId, connection: crate::high_level::Connection) {
let data_tx = self.data_tx.clone();
let connected_peers = Arc::clone(&self.connected_peers);
let peer_activity = Arc::clone(&self.peer_activity);
let ack_waiters = Arc::clone(&self.ack_waiters);
let event_tx = self.event_tx.clone();
let inner = Arc::clone(&self.inner);
let reader_exit_tx = self.reader_exit_tx.clone();
let max_read_bytes = self.config.max_message_size;
let conn_stable_id = connection.stable_id();
let lifecycle_snapshot = self
.inner
.connection_snapshot_by_stable_id(&peer_id, conn_stable_id);
let generation = lifecycle_snapshot
.map(|snapshot| snapshot.generation)
.unwrap_or(conn_stable_id as u64);
let cancel = CancellationToken::new();
if let Some(snapshot) = lifecycle_snapshot {
debug!(
peer_id = ?peer_id,
generation = snapshot.generation,
connection_id = %hex::encode(&snapshot.connection_id[..8]),
established_at_unix_ms = snapshot.established_at_unix_ms,
state = ?snapshot.state,
"spawning reader task with lifecycle snapshot"
);
if !matches!(
snapshot.state,
crate::connection_lifecycle::ConnectionLifecycleState::Live
) {
cancel.cancel();
}
}
let reader_cancel = cancel.clone();
let join_handle = tokio::spawn(async move {
loop {
let mut recv_stream = tokio::select! {
biased;
_ = reader_cancel.cancelled() => {
debug!(
"Reader task for peer {:?} (conn stable_id={}) exiting on graceful cancel",
peer_id, conn_stable_id
);
break;
}
result = connection.accept_uni() => match result {
Ok(stream) => stream,
Err(e) => {
debug!(
"Reader task for peer {:?} (conn stable_id={}) ending: accept_uni error: {}",
peer_id, conn_stable_id, e
);
break;
}
}
};
let data = match recv_stream.read_to_end(max_read_bytes).await {
Ok(data) if data.is_empty() => continue,
Ok(data) => data,
Err(e) => {
debug!(
"Reader task for peer {:?} (conn stable_id={}): read_to_end error: {}",
peer_id, conn_stable_id, e
);
break;
}
};
let data_len = data.len();
tracing::trace!(
"Reader task: {} bytes from peer {:?} (conn stable_id={})",
data_len,
peer_id,
conn_stable_id
);
match inner
.handle_coordinator_control_message(peer_id, connection.clone(), &data)
.await
{
Ok(true) => {
tracing::trace!(
"Reader task: handled coordinator control payload from peer {:?}",
peer_id
);
continue;
}
Ok(false) => {}
Err(e) => {
tracing::warn!(
"Reader task for peer {:?}: failed to handle coordinator control payload: {}",
peer_id,
e
);
continue;
}
}
if let Some((tag, outcome)) = decode_ack_control(&data) {
let waiter_result = match outcome {
AckControlOutcome::Accepted => AckWaiterResult::Accepted,
AckControlOutcome::Rejected(reason) => AckWaiterResult::Rejected(reason),
AckControlOutcome::Closed(reason) => AckWaiterResult::Closed(reason),
};
let resolved = resolve_ack_waiter(
ack_waiters.as_ref(),
conn_stable_id,
tag,
waiter_result,
);
if !resolved {
debug!(
peer_id = ?peer_id,
conn_stable_id,
"received ACK control frame with no matching waiter"
);
}
continue;
}
if let Some(tag) = decode_probe_request(&data) {
note_peer_activity(
&connected_peers,
&peer_activity,
peer_id,
PeerActivityKind::Received,
Instant::now(),
)
.await;
Self::send_ack_control_frame(
connection.clone(),
tag,
AckControlOutcome::Accepted,
)
.await;
continue;
}
let (payload, ack_tag) = if let Some((tag, payload)) = decode_ack_payload(&data) {
(payload.to_vec(), Some(tag))
} else {
(data, None)
};
let payload_len = payload.len();
let now = Instant::now();
note_peer_activity(
&connected_peers,
&peer_activity,
peer_id,
PeerActivityKind::Received,
now,
)
.await;
if let Err(e) = event_tx.send(P2pEvent::DataReceived {
peer_id,
bytes: payload_len,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_data_received_reader",
peer_id = ?peer_id,
bytes = payload_len,
error = %e,
"HIGH: silent drop"
);
}
if data_tx.send((peer_id, payload)).await.is_err() {
if let Some(tag) = ack_tag {
Self::send_ack_control_frame(
connection.clone(),
tag,
AckControlOutcome::Rejected(ReceiveRejectReason::ConsumerGone),
)
.await;
}
debug!(
"Reader task for peer {:?}: channel closed, exiting",
peer_id
);
break;
}
if let Some(tag) = ack_tag {
Self::send_ack_control_frame(
connection.clone(),
tag,
AckControlOutcome::Accepted,
)
.await;
}
}
let _ = reader_exit_tx.send(ReaderExitEvent {
peer_id,
generation,
conn_stable_id,
});
});
let abort_handle = join_handle.abort_handle();
let mut handles = self.reader_handles.write().await;
handles.entry(peer_id).or_default().push(ReaderTaskHandle {
generation,
cancel,
abort_handle,
});
}
async fn apply_peer_address_update(
connected_peers: &RwLock<HashMap<PeerId, PeerConnection>>,
bootstrap_cache: &BootstrapCache,
peer_hint_records: &RwLock<HashMap<PeerId, PeerHintRecord>>,
event_tx: &broadcast::Sender<P2pEvent>,
peer_addr: SocketAddr,
advertised_addr: SocketAddr,
) {
let peer_id = connected_peers
.read()
.await
.iter()
.find(|(_, peer)| peer.remote_addr.as_socket_addr() == Some(peer_addr))
.map(|(peer_id, _)| *peer_id);
if let Some(peer_id) = peer_id {
peer_hint_records
.write()
.await
.entry(peer_id)
.or_default()
.merge(vec![advertised_addr], None);
let mut cached_peer = bootstrap_cache
.get_peer(&peer_id)
.await
.unwrap_or_else(|| CachedPeer::new(peer_id, Vec::new(), PeerSource::Merge));
cached_peer
.capabilities
.record_external_address(advertised_addr);
bootstrap_cache.upsert(cached_peer).await;
} else {
debug!(
peer_addr = %peer_addr,
advertised_addr = %advertised_addr,
"peer address update arrived before peer ID mapping was available"
);
}
if let Err(e) = event_tx.send(P2pEvent::PeerAddressUpdated {
peer_addr,
advertised_addr,
}) {
tracing::warn!(target: "ant_quic::silent_drop", kind = "event_tx_peer_addr_updated", error = %e, "silent drop");
}
}
fn spawn_peer_address_update_poller(&self) {
let inner = Arc::clone(&self.inner);
let connected_peers = Arc::clone(&self.connected_peers);
let bootstrap_cache = Arc::clone(&self.bootstrap_cache);
let peer_hint_records = Arc::clone(&self.peer_hint_records);
let event_tx = self.event_tx.clone();
let shutdown = self.shutdown.clone();
tokio::spawn(async move {
loop {
let update = tokio::select! {
_ = shutdown.cancelled() => break,
update = inner.recv_peer_address_update() => update,
};
let Some((peer_addr, advertised_addr)) = update else {
debug!("Peer address update channel closed, exiting poller");
break;
};
Self::apply_peer_address_update(
connected_peers.as_ref(),
bootstrap_cache.as_ref(),
peer_hint_records.as_ref(),
&event_tx,
peer_addr,
advertised_addr,
)
.await;
}
});
}
fn spawn_constrained_poller(&self) {
let inner = Arc::clone(&self.inner);
let data_tx = self.data_tx.clone();
let connected_peers = Arc::clone(&self.connected_peers);
let peer_activity = Arc::clone(&self.peer_activity);
let event_tx = self.event_tx.clone();
let constrained_peer_addrs = Arc::clone(&self.constrained_peer_addrs);
let constrained_connections = Arc::clone(&self.constrained_connections);
let stats = Arc::clone(&self.stats);
let shutdown = self.shutdown.clone();
async fn register_peer(
peer_id: PeerId,
connection_id: ConstrainedConnectionId,
addr: &TransportAddr,
side: Side,
constrained_connections: &RwLock<HashMap<PeerId, ConstrainedConnectionId>>,
constrained_peer_addrs: &RwLock<
HashMap<ConstrainedConnectionId, (PeerId, TransportAddr)>,
>,
connected_peers: &RwLock<HashMap<PeerId, PeerConnection>>,
stats: &RwLock<EndpointStats>,
event_tx: &broadcast::Sender<P2pEvent>,
) {
constrained_connections
.write()
.await
.insert(peer_id, connection_id);
constrained_peer_addrs
.write()
.await
.insert(connection_id, (peer_id, addr.clone()));
store_connected_peer(
connected_peers,
stats,
event_tx,
PeerConnection {
peer_id,
remote_addr: addr.clone(),
traversal_method: TraversalMethod::Direct,
side,
authenticated: false,
connected_at: Instant::now(),
last_activity: Instant::now(),
},
)
.await;
}
tokio::spawn(async move {
loop {
let wrapper = tokio::select! {
_ = shutdown.cancelled() => break,
event = inner.recv_constrained_event() => {
match event {
Some(w) => w,
None => {
debug!("Constrained event channel closed, exiting poller");
break;
}
}
}
};
match wrapper.event {
EngineEvent::DataReceived {
connection_id,
data,
} => {
let peer_id = constrained_peer_addrs
.read()
.await
.get(&connection_id)
.map(|(pid, _)| *pid)
.unwrap_or_else(|| {
peer_id_from_socket_addr(
wrapper.remote_addr.to_synthetic_socket_addr(),
)
});
let data_len = data.len();
tracing::trace!(
"Constrained poller: {} bytes from peer {:?}",
data_len,
peer_id
);
let now = Instant::now();
note_peer_activity(
&connected_peers,
&peer_activity,
peer_id,
PeerActivityKind::Received,
now,
)
.await;
if let Err(e) = event_tx.send(P2pEvent::DataReceived {
peer_id,
bytes: data_len,
}) {
tracing::warn!(
target: "ant_quic::silent_drop",
kind = "event_tx_data_received_reader",
peer_id = ?peer_id,
bytes = data_len,
error = %e,
"HIGH: silent drop"
);
}
if data_tx.send((peer_id, data)).await.is_err() {
debug!("Constrained poller: channel closed, exiting");
break;
}
}
EngineEvent::ConnectionAccepted {
connection_id,
remote_addr: _,
} => {
let peer_id = peer_id_from_transport_addr(&wrapper.remote_addr);
register_peer(
peer_id,
connection_id,
&wrapper.remote_addr,
Side::Server,
&constrained_connections,
&constrained_peer_addrs,
&connected_peers,
&stats,
&event_tx,
)
.await;
}
EngineEvent::ConnectionEstablished { connection_id } => {
if constrained_peer_addrs
.read()
.await
.get(&connection_id)
.is_none()
{
let peer_id = peer_id_from_transport_addr(&wrapper.remote_addr);
register_peer(
peer_id,
connection_id,
&wrapper.remote_addr,
Side::Client,
&constrained_connections,
&constrained_peer_addrs,
&connected_peers,
&stats,
&event_tx,
)
.await;
}
}
EngineEvent::ConnectionClosed { connection_id } => {
let peer_info = constrained_peer_addrs.write().await.remove(&connection_id);
if let Some((peer_id, addr)) = peer_info {
constrained_connections.write().await.remove(&peer_id);
let _ = remove_connected_peer(
&connected_peers,
&stats,
&event_tx,
&peer_id,
DisconnectReason::RemoteClosed,
)
.await;
debug!(
"Constrained poller: peer {:?} at {} disconnected",
peer_id, addr
);
}
}
EngineEvent::ConnectionError {
connection_id,
error,
} => {
warn!(
"Constrained poller: conn_id={}, error={}",
connection_id.value(),
error
);
}
EngineEvent::Transmit { .. } => {}
}
}
});
}
fn spawn_reader_exit_handler(&self) {
let reader_exit_rx = Arc::clone(&self.reader_exit_rx);
let connected_peers = Arc::clone(&self.connected_peers);
let inner = Arc::clone(&self.inner);
let reader_handles = Arc::clone(&self.reader_handles);
let direct_path_statuses = Arc::clone(&self.direct_path_statuses);
let stats = Arc::clone(&self.stats);
let event_tx = self.event_tx.clone();
let peer_event_tx = self.peer_event_tx.clone();
let peer_event_channels = Arc::clone(&self.peer_event_channels);
let peer_event_generations = Arc::clone(&self.peer_event_generations);
let ack_waiters = Arc::clone(&self.ack_waiters);
let shutdown = self.shutdown.clone();
tokio::spawn(async move {
loop {
let maybe_reader_exit = tokio::select! {
_ = shutdown.cancelled() => {
debug!("Reader exit handler shutting down");
return;
}
reader_exit = async {
let mut rx = reader_exit_rx.lock().await;
rx.recv().await
} => reader_exit,
};
let Some(ReaderExitEvent {
peer_id,
generation,
conn_stable_id,
}) = maybe_reader_exit
else {
debug!("Reader exit handler stopping: all reader-exit senders dropped");
return;
};
let last_reader = {
let mut handles = reader_handles.write().await;
match handles.get_mut(&peer_id) {
Some(vec) => {
vec.retain(|h| h.generation != generation);
if vec.is_empty() {
handles.remove(&peer_id);
true
} else {
false
}
}
None => false,
}
};
let snapshot_before =
inner.connection_snapshot_by_stable_id(&peer_id, conn_stable_id);
emit_peer_lifecycle_event(
&peer_event_tx,
peer_event_channels.as_ref(),
peer_id,
PeerLifecycleEvent::ReaderExited { generation },
);
let exit_outcome = inner.handle_reader_exit(&peer_id, generation, conn_stable_id);
match exit_outcome {
crate::nat_traversal_api::ReaderExitOutcome::Noop => {
debug!(
"Reader task exited for peer {:?} (generation {}, conn stable_id {}); no lifecycle entry remained",
peer_id, generation, conn_stable_id
);
continue;
}
crate::nat_traversal_api::ReaderExitOutcome::ConnectionReaped => {
if let Some(snapshot) = snapshot_before {
fail_ack_waiters_for_connection(
ack_waiters.as_ref(),
snapshot.stable_id,
ConnectionCloseReason::Superseded,
);
match snapshot.state {
crate::connection_lifecycle::ConnectionLifecycleState::Superseded { .. }
| crate::connection_lifecycle::ConnectionLifecycleState::Live => {
emit_peer_lifecycle_event(
&peer_event_tx,
peer_event_channels.as_ref(),
peer_id,
PeerLifecycleEvent::Closed {
generation: snapshot.generation,
reason: ConnectionCloseReason::Superseded,
},
);
}
crate::connection_lifecycle::ConnectionLifecycleState::Closing { .. }
| crate::connection_lifecycle::ConnectionLifecycleState::Closed { .. } => {}
}
}
debug!(
"Reader task exited for peer {:?} (generation {}, conn stable_id {}); superseded connection reaped",
peer_id, generation, conn_stable_id
);
continue;
}
crate::nat_traversal_api::ReaderExitOutcome::PeerDisconnected {
close_reason,
} => {
emit_peer_lifecycle_event(
&peer_event_tx,
peer_event_channels.as_ref(),
peer_id,
PeerLifecycleEvent::Closing {
generation,
reason: close_reason,
},
);
emit_peer_lifecycle_event(
&peer_event_tx,
peer_event_channels.as_ref(),
peer_id,
PeerLifecycleEvent::Closed {
generation,
reason: close_reason,
},
);
fail_ack_waiters_for_connection(
ack_waiters.as_ref(),
conn_stable_id,
close_reason,
);
if !last_reader {
debug!(
"Live reader task exited for peer {:?} (generation {}, conn stable_id {}); other readers still draining, deferring peer cleanup",
peer_id, generation, conn_stable_id
);
continue;
}
debug!(
"Last live reader task for peer {:?} (generation {}, conn stable_id {}) exited — triggering cleanup",
peer_id, generation, conn_stable_id
);
do_cleanup_connection(
&*connected_peers,
&*inner,
&*reader_handles,
&*direct_path_statuses,
&*stats,
&event_tx,
&peer_event_tx,
peer_event_channels.as_ref(),
peer_event_generations.as_ref(),
ack_waiters.as_ref(),
&peer_id,
DisconnectReason::ConnectionLost,
close_reason,
)
.await;
}
}
}
});
}
fn spawn_stale_connection_reaper(&self) {
let connected_peers = Arc::clone(&self.connected_peers);
let inner = Arc::clone(&self.inner);
let event_tx = self.event_tx.clone();
let peer_event_tx = self.peer_event_tx.clone();
let peer_event_channels = Arc::clone(&self.peer_event_channels);
let peer_event_generations = Arc::clone(&self.peer_event_generations);
let ack_waiters = Arc::clone(&self.ack_waiters);
let stats = Arc::clone(&self.stats);
let reader_handles = Arc::clone(&self.reader_handles);
let direct_path_statuses = Arc::clone(&self.direct_path_statuses);
let shutdown = self.shutdown.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {}
_ = shutdown.cancelled() => {
debug!("Stale connection reaper shutting down");
return;
}
}
let stale_peers: Vec<PeerId> = {
let peers = connected_peers.read().await;
peers
.keys()
.filter(|id| !inner.is_peer_connected(id))
.copied()
.collect()
};
if !stale_peers.is_empty() {
info!(
"Stale connection reaper: removing {} dead connection(s)",
stale_peers.len()
);
}
for peer_id in &stale_peers {
do_cleanup_connection(
&*connected_peers,
&*inner,
&*reader_handles,
&*direct_path_statuses,
&*stats,
&event_tx,
&peer_event_tx,
peer_event_channels.as_ref(),
peer_event_generations.as_ref(),
ack_waiters.as_ref(),
peer_id,
DisconnectReason::Timeout,
ConnectionCloseReason::TimedOut,
)
.await;
}
}
});
}
}
impl Clone for P2pEndpoint {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
connected_peers: Arc::clone(&self.connected_peers),
stats: Arc::clone(&self.stats),
config: self.config.clone(),
event_tx: self.event_tx.clone(),
peer_id: self.peer_id,
public_key: self.public_key.clone(),
shutdown: self.shutdown.clone(),
pending_data: Arc::clone(&self.pending_data),
bootstrap_cache: Arc::clone(&self.bootstrap_cache),
peer_hint_records: Arc::clone(&self.peer_hint_records),
transport_registry: Arc::clone(&self.transport_registry),
router: Arc::clone(&self.router),
constrained_connections: Arc::clone(&self.constrained_connections),
constrained_peer_addrs: Arc::clone(&self.constrained_peer_addrs),
manual_known_peer_udp_addrs: Arc::clone(&self.manual_known_peer_udp_addrs),
port_mapping_state: Arc::clone(&self.port_mapping_state),
mdns_state: Arc::clone(&self.mdns_state),
mdns_auto_connect_inflight: Arc::clone(&self.mdns_auto_connect_inflight),
direct_path_statuses: Arc::clone(&self.direct_path_statuses),
data_tx: self.data_tx.clone(),
data_rx: Arc::clone(&self.data_rx),
reader_exit_tx: self.reader_exit_tx.clone(),
reader_exit_rx: Arc::clone(&self.reader_exit_rx),
reader_handles: Arc::clone(&self.reader_handles),
peer_activity: Arc::clone(&self.peer_activity),
ack_waiters: Arc::clone(&self.ack_waiters),
peer_event_tx: self.peer_event_tx.clone(),
peer_event_channels: Arc::clone(&self.peer_event_channels),
peer_event_generations: Arc::clone(&self.peer_event_generations),
coordinator_health: Arc::clone(&self.coordinator_health),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coordinator_control::RejectionReason;
fn collect_broadcast_events(
events: &mut tokio::sync::broadcast::Receiver<P2pEvent>,
) -> Vec<P2pEvent> {
std::iter::from_fn(|| events.try_recv().ok()).collect()
}
#[tokio::test]
async fn test_try_addrs_with_shared_stage_budget_stops_after_budget_exhaustion() {
let first: SocketAddr = "203.0.113.10:9000".parse().expect("first addr");
let second: SocketAddr = "203.0.113.11:9000".parse().expect("second addr");
let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let start = Instant::now();
let result = try_addrs_with_shared_stage_budget(
&[first, second],
"IPv4",
Duration::from_millis(40),
{
let attempts = std::sync::Arc::clone(&attempts);
move |addr| {
let attempts = std::sync::Arc::clone(&attempts);
async move {
attempts.lock().expect("attempt log").push(addr);
tokio::time::sleep(Duration::from_millis(60)).await;
Ok::<SocketAddr, EndpointError>(addr)
}
}
},
)
.await;
assert!(
result.is_none(),
"stage budget exhaustion should stop the family"
);
assert_eq!(attempts.lock().expect("attempt log").as_slice(), &[first]);
assert!(
Instant::now().duration_since(start) < Duration::from_millis(120),
"shared family budgeting should stop before retrying the second address"
);
}
#[tokio::test]
async fn test_try_addrs_with_shared_stage_budget_allows_later_success_before_deadline() {
let first: SocketAddr = "203.0.113.20:9000".parse().expect("first addr");
let second: SocketAddr = "203.0.113.21:9000".parse().expect("second addr");
let attempts = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let result = try_addrs_with_shared_stage_budget(
&[first, second],
"IPv4",
Duration::from_millis(80),
{
let attempts = std::sync::Arc::clone(&attempts);
move |addr| {
let attempts = std::sync::Arc::clone(&attempts);
async move {
attempts.lock().expect("attempt log").push(addr);
tokio::time::sleep(Duration::from_millis(10)).await;
if addr == first {
Err(EndpointError::Connection(
"first candidate failed".to_string(),
))
} else {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(addr)
}
}
}
},
)
.await;
assert_eq!(result, Some(second));
assert_eq!(
attempts.lock().expect("attempt log").as_slice(),
&[first, second]
);
}
#[tokio::test]
async fn test_adaptive_strategy_config_for_candidates_uses_same_budget_model() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x77; 32]);
let direct_addr: SocketAddr = "203.0.113.77:9000".parse().expect("direct addr");
let mut cached_peer = CachedPeer::new(peer_id, vec![direct_addr], PeerSource::Seed);
cached_peer.stats.avg_rtt_ms = 420;
endpoint.bootstrap_cache.upsert(cached_peer).await;
let (config, rtt_hint) = endpoint
.adaptive_strategy_config_for_candidates(Some(peer_id), &[direct_addr], None, &[])
.await;
let mut expected = StrategyConfig::default();
expected.apply_adaptive_timeouts(
endpoint
.config
.timeouts
.nat_traversal
.connection_establishment_timeout,
endpoint.config.timeouts.nat_traversal.coordination_timeout,
Some(Duration::from_millis(420)),
);
assert_eq!(rtt_hint, Some(Duration::from_millis(420)));
assert_eq!(config.ipv4_timeout, expected.ipv4_timeout);
assert_eq!(config.ipv6_timeout, expected.ipv6_timeout);
assert_eq!(config.holepunch_timeout, expected.holepunch_timeout);
assert_eq!(config.relay_timeout, expected.relay_timeout);
endpoint.shutdown().await;
}
#[test]
fn test_strategy_rtt_hint_from_cached_peers_prefers_slowest_matching_path() {
let direct_addr: SocketAddr = "203.0.113.10:9000".parse().expect("direct addr");
let relay_addr: SocketAddr = "198.51.100.20:9443".parse().expect("relay addr");
let now = std::time::SystemTime::UNIX_EPOCH;
let direct_peer = CachedPeer {
peer_id: PeerId([0x11; 32]),
addresses: vec![direct_addr],
capabilities: PeerCapabilities::default(),
first_seen: now,
last_seen: now,
last_attempt: None,
stats: crate::bootstrap_cache::ConnectionStats {
avg_rtt_ms: 120,
..Default::default()
},
quality_score: 0.5,
source: PeerSource::Seed,
relay_paths: Vec::new(),
token: None,
};
let relay_peer = CachedPeer {
peer_id: PeerId([0x22; 32]),
addresses: vec![relay_addr],
capabilities: PeerCapabilities::default(),
first_seen: now,
last_seen: now,
last_attempt: None,
stats: crate::bootstrap_cache::ConnectionStats {
avg_rtt_ms: 480,
..Default::default()
},
quality_score: 0.5,
source: PeerSource::Seed,
relay_paths: Vec::new(),
token: None,
};
let unrelated_peer = CachedPeer {
peer_id: PeerId([0x33; 32]),
addresses: vec!["192.0.2.99:9999".parse().expect("other addr")],
capabilities: PeerCapabilities::default(),
first_seen: now,
last_seen: now,
last_attempt: None,
stats: crate::bootstrap_cache::ConnectionStats {
avg_rtt_ms: 900,
..Default::default()
},
quality_score: 0.5,
source: PeerSource::Seed,
relay_paths: Vec::new(),
token: None,
};
let hint = strategy_rtt_hint_from_cached_peers(
&[direct_peer, relay_peer, unrelated_peer],
&[direct_addr, relay_addr],
)
.expect("matching peers should yield an RTT hint");
assert_eq!(hint, Duration::from_millis(480));
}
#[test]
fn test_endpoint_stats_default() {
let stats = EndpointStats::default();
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.successful_connections, 0);
assert_eq!(stats.nat_traversal_attempts, 0);
}
#[tokio::test]
async fn test_ack_waiter_cleanup_on_connection_failure() {
let ack_waiters = ParkingRwLock::new(HashMap::new());
let (tx, rx) = oneshot::channel();
let stable_id = 42usize;
let tag = [0xAA; 16];
assert!(register_ack_waiter(&ack_waiters, stable_id, tag, tx));
fail_ack_waiters_for_connection(&ack_waiters, stable_id, ConnectionCloseReason::TimedOut);
match rx.await.expect("ack waiter result") {
AckWaiterResult::Closed(ConnectionCloseReason::TimedOut) => {}
other => panic!("unexpected waiter result: {other:?}"),
}
assert!(ack_waiters.read().is_empty());
}
#[test]
fn test_connection_health_observation_never_seen_patterns() {
let now = Instant::now();
let health =
ConnectionHealth::from_observation(ConnectionHealthObservation::default(), now);
assert!(!health.connected);
assert_eq!(health.generation, None);
assert_eq!(health.reader_task_active, None);
assert_eq!(health.last_received_at, None);
assert_eq!(health.last_sent_at, None);
assert_eq!(health.idle_for, None);
assert_eq!(health.close_reason, None);
}
#[test]
fn test_connection_health_observation_connected_patterns() {
let now = Instant::now();
let last_sent_at = now
.checked_sub(Duration::from_secs(3))
.expect("sent instant");
let last_received_at = now
.checked_sub(Duration::from_secs(1))
.expect("received instant");
let health = ConnectionHealth::from_observation(
ConnectionHealthObservation {
connected: true,
generation: Some(42),
reader_task_active: Some(true),
last_received_at: Some(last_received_at),
last_sent_at: Some(last_sent_at),
close_reason: None,
},
now,
);
assert!(health.connected);
assert_eq!(health.generation, Some(42));
assert_eq!(health.reader_task_active, Some(true));
assert_eq!(health.last_received_at, Some(last_received_at));
assert_eq!(health.last_sent_at, Some(last_sent_at));
assert_eq!(health.idle_for, Some(Duration::from_secs(1)));
assert_eq!(health.close_reason, None);
}
#[test]
fn test_connection_health_observation_closing_patterns() {
let now = Instant::now();
let health = ConnectionHealth::from_observation(
ConnectionHealthObservation {
connected: false,
generation: None,
reader_task_active: None,
last_received_at: None,
last_sent_at: Some(
now.checked_sub(Duration::from_secs(2))
.expect("sent instant"),
),
close_reason: Some(ConnectionCloseReason::ReaderExit),
},
now,
);
assert!(!health.connected);
assert_eq!(health.generation, None);
assert_eq!(health.reader_task_active, None);
assert!(health.last_sent_at.is_some());
assert_eq!(health.idle_for, None);
assert_eq!(health.close_reason, Some(ConnectionCloseReason::ReaderExit));
}
#[test]
fn test_connection_health_observation_closed_patterns() {
let now = Instant::now();
let health = ConnectionHealth::from_observation(
ConnectionHealthObservation {
connected: false,
generation: None,
reader_task_active: None,
last_received_at: Some(
now.checked_sub(Duration::from_secs(4))
.expect("received instant"),
),
last_sent_at: None,
close_reason: Some(ConnectionCloseReason::LifecycleCleanup),
},
now,
);
assert!(!health.connected);
assert_eq!(health.generation, None);
assert_eq!(health.reader_task_active, None);
assert!(health.last_received_at.is_some());
assert_eq!(health.idle_for, None);
assert_eq!(
health.close_reason,
Some(ConnectionCloseReason::LifecycleCleanup)
);
}
#[tokio::test]
async fn test_record_connection_established_updates_direct_server_stats_once() {
let stats = RwLock::new(EndpointStats::default());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let remote_addr: SocketAddr = "127.0.0.1:9000".parse().expect("valid addr");
let peer_conn = PeerConnection {
peer_id: PeerId([0x11; 32]),
remote_addr: TransportAddr::Udp(remote_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
record_connection_established(&stats, &event_tx, &peer_conn, None).await;
let stats = stats.read().await;
assert_eq!(stats.active_connections, 1);
assert_eq!(stats.successful_connections, 1);
assert_eq!(stats.direct_connections, 1);
assert_eq!(stats.relayed_connections, 0);
assert_eq!(stats.active_direct_incoming_connections, 1);
assert!(stats.last_direct_loopback_at.is_some());
drop(stats);
match event_rx.recv().await.expect("peer connected event") {
P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} => {
assert_eq!(peer_id, peer_conn.peer_id);
assert_eq!(addr, peer_conn.remote_addr);
assert_eq!(side, Side::Server);
assert_eq!(traversal_method, TraversalMethod::Direct);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[tokio::test]
async fn test_record_connection_established_updates_relay_stats_once() {
let stats = RwLock::new(EndpointStats::default());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let remote_addr: SocketAddr = "203.0.113.10:9443".parse().expect("valid addr");
let peer_conn = PeerConnection {
peer_id: PeerId([0x22; 32]),
remote_addr: TransportAddr::Udp(remote_addr),
traversal_method: TraversalMethod::Relay,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
record_connection_established(&stats, &event_tx, &peer_conn, None).await;
let stats = stats.read().await;
assert_eq!(stats.active_connections, 1);
assert_eq!(stats.successful_connections, 1);
assert_eq!(stats.direct_connections, 0);
assert_eq!(stats.relayed_connections, 1);
assert_eq!(stats.active_direct_incoming_connections, 0);
drop(stats);
match event_rx.recv().await.expect("peer connected event") {
P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} => {
assert_eq!(peer_id, peer_conn.peer_id);
assert_eq!(addr, peer_conn.remote_addr);
assert_eq!(side, Side::Client);
assert_eq!(traversal_method, TraversalMethod::Relay);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[test]
fn test_should_retry_hole_punch_reason_distinguishes_retryable_and_terminal_reasons() {
assert!(P2pEndpoint::should_retry_hole_punch_reason(
&TraversalFailureReason::CoordinatorUnavailable
));
assert!(P2pEndpoint::should_retry_hole_punch_reason(
&TraversalFailureReason::NetworkError("transient".to_string())
));
assert!(!P2pEndpoint::should_retry_hole_punch_reason(
&TraversalFailureReason::CoordinationRejected {
reason: RejectionReason::RateLimited,
}
));
assert!(!P2pEndpoint::should_retry_hole_punch_reason(
&TraversalFailureReason::ProtocolViolation("bad state".to_string())
));
}
#[test]
fn test_traversal_failure_reason_from_nat_error_maps_representative_hole_punch_failures() {
assert!(matches!(
TraversalFailureReason::from_public_operation_error(
&NatTraversalError::NoBootstrapNodes
),
Some(TraversalFailureReason::CoordinatorUnavailable)
));
assert!(matches!(
TraversalFailureReason::from_public_operation_error(
&NatTraversalError::HolePunchingFailed
),
Some(TraversalFailureReason::PunchWindowMissed)
));
assert!(matches!(
TraversalFailureReason::from_public_operation_error(&NatTraversalError::ProtocolError(
"malformed".to_string()
)),
Some(TraversalFailureReason::ProtocolViolation(message)) if message == "malformed"
));
}
#[test]
fn test_endpoint_error_from_traversal_failure_maps_typed_terminal_outcome() {
let error = P2pEndpoint::endpoint_error_from_traversal_failure(
TraversalFailureReason::CoordinationRejected {
reason: RejectionReason::Expired,
},
);
assert!(matches!(
error,
EndpointError::NatTraversal(NatTraversalError::CoordinationFailed(message))
if message.contains("request expired")
));
}
#[test]
fn test_hole_punch_await_error_display_uses_typed_reason() {
let error =
HolePunchAwaitError::TraversalFailure(TraversalFailureReason::CoordinationRejected {
reason: RejectionReason::Expired,
});
assert_eq!(error.to_string(), "coordination rejected: request expired");
}
#[test]
fn test_hole_punch_await_error_preserves_typed_retry_classification() {
let rejected =
HolePunchAwaitError::TraversalFailure(TraversalFailureReason::CoordinationRejected {
reason: RejectionReason::RateLimited,
});
assert!(matches!(
rejected.retry_reason(),
Some(TraversalFailureReason::CoordinationRejected {
reason: RejectionReason::RateLimited,
})
));
assert!(
!rejected
.retry_reason()
.is_some_and(P2pEndpoint::should_retry_hole_punch_reason)
);
let sync_expired =
HolePunchAwaitError::TraversalFailure(TraversalFailureReason::SynchronizationExpired);
assert!(matches!(
sync_expired.retry_reason(),
Some(TraversalFailureReason::SynchronizationExpired)
));
assert!(
!sync_expired
.retry_reason()
.is_some_and(P2pEndpoint::should_retry_hole_punch_reason)
);
let punch_missed =
HolePunchAwaitError::TraversalFailure(TraversalFailureReason::PunchWindowMissed);
assert!(
punch_missed
.retry_reason()
.is_some_and(P2pEndpoint::should_retry_hole_punch_reason)
);
}
#[tokio::test]
async fn test_record_connection_established_updates_hole_punch_stats_once() {
let stats = RwLock::new(EndpointStats::default());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let remote_addr: SocketAddr = "198.51.100.44:9443".parse().expect("valid addr");
let peer_conn = PeerConnection {
peer_id: PeerId([0x23; 32]),
remote_addr: TransportAddr::Udp(remote_addr),
traversal_method: TraversalMethod::HolePunch,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
record_connection_established(&stats, &event_tx, &peer_conn, None).await;
let stats = stats.read().await;
assert_eq!(stats.active_connections, 1);
assert_eq!(stats.successful_connections, 1);
assert_eq!(stats.direct_connections, 0);
assert_eq!(stats.relayed_connections, 0);
assert_eq!(stats.active_direct_incoming_connections, 0);
drop(stats);
match event_rx.recv().await.expect("peer connected event") {
P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} => {
assert_eq!(peer_id, peer_conn.peer_id);
assert_eq!(addr, peer_conn.remote_addr);
assert_eq!(side, Side::Client);
assert_eq!(traversal_method, TraversalMethod::HolePunch);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[tokio::test]
async fn test_bridge_nat_traversal_event_does_not_emit_peer_connected() {
let stats = RwLock::new(EndpointStats::default());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let direct_path_statuses = ParkingRwLock::new(HashMap::new());
let peer_id = PeerId([0x33; 32]);
let remote_addr: SocketAddr = "198.51.100.7:9001".parse().expect("valid addr");
bridge_nat_traversal_event(
&stats,
&event_tx,
&direct_path_statuses,
NatTraversalEvent::ConnectionEstablished {
peer_id,
remote_address: remote_addr,
side: Side::Client,
},
)
.await;
let stats = stats.read().await;
assert_eq!(stats.nat_traversal_successes, 1);
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.successful_connections, 0);
drop(stats);
let collected = collect_broadcast_events(&mut event_rx);
assert!(
!collected
.iter()
.any(|event| matches!(event, P2pEvent::PeerConnected { .. })),
"NAT traversal bridge should not emit PeerConnected directly"
);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::DirectPathStatus {
peer_id: observed_peer_id,
status: DirectPathStatus::Established { remote_addr: observed_addr },
} if *observed_peer_id == peer_id && *observed_addr == remote_addr
)));
assert_eq!(
direct_path_statuses.read().get(&peer_id),
Some(&DirectPathStatus::Established { remote_addr })
);
}
#[tokio::test]
async fn test_bridge_nat_traversal_failure_surfaces_best_effort_status() {
let stats = RwLock::new(EndpointStats::default());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let direct_path_statuses = ParkingRwLock::new(HashMap::new());
let peer_id = PeerId([0x34; 32]);
bridge_nat_traversal_event(
&stats,
&event_tx,
&direct_path_statuses,
NatTraversalEvent::TraversalFailed {
peer_id,
error: NatTraversalError::HolePunchingFailed,
fallback_available: true,
},
)
.await;
let stats = stats.read().await;
assert_eq!(stats.failed_connections, 1);
drop(stats);
let collected = collect_broadcast_events(&mut event_rx);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::DirectPathStatus {
peer_id: observed_peer_id,
status: DirectPathStatus::BestEffortUnavailable {
reason: DirectPathUnavailableReason::NatUnreachable,
},
} if *observed_peer_id == peer_id
)));
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::NatTraversalProgress {
peer_id: observed_peer_id,
phase: TraversalPhase::Failed,
} if *observed_peer_id == peer_id
)));
assert_eq!(
direct_path_statuses.read().get(&peer_id),
Some(&DirectPathStatus::BestEffortUnavailable {
reason: DirectPathUnavailableReason::NatUnreachable,
})
);
}
#[tokio::test]
async fn test_bridge_nat_traversal_terminated_is_authoritative_over_legacy_failed() {
let stats = RwLock::new(EndpointStats::default());
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(8);
let direct_path_statuses = ParkingRwLock::new(HashMap::new());
let peer_id = PeerId([0x35; 32]);
bridge_nat_traversal_event(
&stats,
&event_tx,
&direct_path_statuses,
NatTraversalEvent::TraversalTerminated {
peer_id,
reason: TraversalFailureReason::PunchWindowMissed,
fallback_available: true,
},
)
.await;
bridge_nat_traversal_event(
&stats,
&event_tx,
&direct_path_statuses,
NatTraversalEvent::TraversalFailed {
peer_id,
error: NatTraversalError::HolePunchingFailed,
fallback_available: true,
},
)
.await;
let stats = stats.read().await;
assert_eq!(stats.failed_connections, 1);
drop(stats);
let collected = collect_broadcast_events(&mut event_rx);
assert_eq!(
collected
.iter()
.filter(|event| matches!(
event,
P2pEvent::NatTraversalProgress {
peer_id: observed_peer_id,
phase: TraversalPhase::Failed,
} if *observed_peer_id == peer_id
))
.count(),
1
);
assert_eq!(
collected
.iter()
.filter(|event| matches!(
event,
P2pEvent::DirectPathStatus {
peer_id: observed_peer_id,
status: DirectPathStatus::BestEffortUnavailable {
reason: DirectPathUnavailableReason::NatUnreachable,
},
} if *observed_peer_id == peer_id
))
.count(),
1
);
}
#[tokio::test]
async fn test_record_connection_established_replacement_does_not_double_count() {
let stats = RwLock::new(EndpointStats {
active_connections: 1,
successful_connections: 1,
relayed_connections: 1,
..EndpointStats::default()
});
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let previous = PeerConnection {
peer_id: PeerId([0x44; 32]),
remote_addr: TransportAddr::Udp("203.0.113.20:9443".parse().expect("valid addr")),
traversal_method: TraversalMethod::Relay,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let replacement = PeerConnection {
peer_id: previous.peer_id,
remote_addr: TransportAddr::Udp("127.0.0.1:9443".parse().expect("valid addr")),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
record_connection_established(&stats, &event_tx, &replacement, Some(&previous)).await;
let stats = stats.read().await;
assert_eq!(stats.active_connections, 1);
assert_eq!(stats.successful_connections, 1);
assert_eq!(stats.direct_connections, 1);
assert_eq!(stats.relayed_connections, 1);
assert_eq!(stats.active_direct_incoming_connections, 1);
drop(stats);
match event_rx.recv().await.expect("peer connected event") {
P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} => {
assert_eq!(peer_id, replacement.peer_id);
assert_eq!(addr, replacement.remote_addr);
assert_eq!(side, Side::Server);
assert_eq!(traversal_method, TraversalMethod::Direct);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[tokio::test]
async fn test_record_connection_established_identical_replacement_is_quiet() {
let stats = RwLock::new(EndpointStats {
active_connections: 1,
successful_connections: 1,
direct_connections: 1,
active_direct_incoming_connections: 1,
..EndpointStats::default()
});
let (event_tx, mut event_rx) = tokio::sync::broadcast::channel(4);
let previous = PeerConnection {
peer_id: PeerId([0x55; 32]),
remote_addr: TransportAddr::Udp("127.0.0.1:9555".parse().expect("valid addr")),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let replacement = PeerConnection {
connected_at: Instant::now(),
last_activity: Instant::now(),
..previous.clone()
};
record_connection_established(&stats, &event_tx, &replacement, Some(&previous)).await;
let stats = stats.read().await;
assert_eq!(stats.active_connections, 1);
assert_eq!(stats.successful_connections, 1);
assert_eq!(stats.direct_connections, 1);
assert_eq!(stats.active_direct_incoming_connections, 1);
drop(stats);
assert!(matches!(
event_rx.try_recv(),
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
));
}
#[tokio::test]
async fn test_cleanup_connection_removes_direct_path_status_and_emits_disconnect() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x56; 32]);
let mut events = endpoint.subscribe();
endpoint
.register_connected_peer(PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp("127.0.0.1:9556".parse().expect("valid addr")),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
})
.await;
endpoint.direct_path_statuses.write().insert(
peer_id,
DirectPathStatus::Established {
remote_addr: "127.0.0.1:9556".parse().expect("valid addr"),
},
);
endpoint
.cleanup_connection(&peer_id, DisconnectReason::ConnectionLost)
.await;
assert!(endpoint.direct_path_status(peer_id).is_none());
assert!(!endpoint.is_connected(&peer_id).await);
assert_eq!(endpoint.stats().await.active_connections, 0);
let collected = collect_broadcast_events(&mut events);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::PeerDisconnected {
peer_id: observed_peer_id,
reason: DisconnectReason::ConnectionLost,
} if *observed_peer_id == peer_id
)));
endpoint.shutdown().await;
}
#[test]
fn test_connection_metrics_default() {
let metrics = ConnectionMetrics::default();
assert_eq!(metrics.bytes_sent, 0);
assert_eq!(metrics.bytes_received, 0);
assert!(metrics.rtt.is_none());
assert_eq!(metrics.packet_loss, 0.0);
}
#[test]
fn test_peer_connection_debug() {
let socket_addr: SocketAddr = "127.0.0.1:8080".parse().expect("valid addr");
let conn = PeerConnection {
peer_id: PeerId([0u8; 32]),
remote_addr: TransportAddr::Udp(socket_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: false,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
let debug_str = format!("{:?}", conn);
assert!(debug_str.contains("PeerConnection"));
}
#[test]
fn test_disconnect_reason_debug() {
let reason = DisconnectReason::Normal;
assert!(format!("{:?}", reason).contains("Normal"));
let reason = DisconnectReason::ProtocolError("test".to_string());
assert!(format!("{:?}", reason).contains("test"));
}
#[test]
fn test_traversal_phase_debug() {
let phase = TraversalPhase::Discovery;
assert!(format!("{:?}", phase).contains("Discovery"));
}
#[test]
fn test_endpoint_error_display() {
let err = EndpointError::Timeout;
assert!(err.to_string().contains("timed out"));
let err = EndpointError::PeerNotFound(PeerId([0u8; 32]));
assert!(err.to_string().contains("not found"));
}
#[tokio::test]
async fn test_endpoint_creation() {
let config = P2pConfig::builder().build().expect("valid config");
let result = P2pEndpoint::new(config).await;
if let Ok(endpoint) = result {
assert!(endpoint.is_running());
assert!(endpoint.local_addr().is_some() || endpoint.local_addr().is_none());
}
}
#[tokio::test]
async fn test_p2p_endpoint_stores_transport_registry() {
use crate::transport::TransportType;
let config = P2pConfig::builder().build().expect("valid config");
let result = P2pEndpoint::new(config).await;
if let Ok(endpoint) = result {
let registry = endpoint.transport_registry();
assert!(
!registry.is_empty(),
"Registry should have at least 1 provider"
);
let udp_providers = registry.providers_by_type(TransportType::Udp);
assert_eq!(udp_providers.len(), 1, "Should have 1 UDP provider");
}
}
#[tokio::test]
async fn test_p2p_endpoint_default_config_has_udp_registry() {
let config = P2pConfig::builder().build().expect("valid config");
let result = P2pEndpoint::new(config).await;
if let Ok(endpoint) = result {
let registry = endpoint.transport_registry();
assert!(
!registry.is_empty(),
"Default registry should have UDP for socket sharing"
);
assert!(
registry.has_quic_capable_transport(),
"Default registry should have QUIC-capable transport"
);
}
}
#[tokio::test]
async fn test_port_mapping_disabled_mode_starts_cleanly() {
let config = P2pConfig::builder()
.port_mapping_enabled(false)
.build()
.expect("valid config");
if let Ok(endpoint) = P2pEndpoint::new(config).await {
assert!(!endpoint.port_mapping_active());
assert_eq!(endpoint.port_mapping_addr(), None);
endpoint.shutdown().await;
}
}
#[tokio::test]
async fn test_port_mapping_candidate_propagates_to_external_addresses() {
let config = P2pConfig::builder()
.port_mapping_enabled(false)
.build()
.expect("valid config");
if let Ok(endpoint) = P2pEndpoint::new(config).await {
let mapped_addr: SocketAddr = "198.51.100.55:41000".parse().expect("valid addr");
endpoint.apply_port_mapping_snapshot(PortMappingSnapshot {
active: true,
external_addr: Some(mapped_addr),
});
assert!(endpoint.port_mapping_active());
assert_eq!(endpoint.port_mapping_addr(), Some(mapped_addr));
assert!(endpoint.all_external_addrs().contains(&mapped_addr));
assert_eq!(
endpoint.inner.relay_server_public_address(),
Some(mapped_addr)
);
endpoint.shutdown().await;
}
}
#[tokio::test]
async fn test_port_mapping_event_surfaces_lifecycle_and_external_address() {
let config = P2pConfig::builder()
.port_mapping_enabled(false)
.build()
.expect("valid config");
if let Ok(endpoint) = P2pEndpoint::new(config).await {
let mut events = endpoint.subscribe();
let mapped_addr: SocketAddr = "198.51.100.88:42000".parse().expect("valid addr");
endpoint.apply_port_mapping_event(PortMappingEvent::Established {
snapshot: PortMappingSnapshot {
active: true,
external_addr: Some(mapped_addr),
},
});
endpoint.apply_port_mapping_event(PortMappingEvent::Failed {
error: "simulated failure".to_string(),
});
endpoint.apply_port_mapping_event(PortMappingEvent::Removed {
external_addr: Some(mapped_addr),
});
let collected: Vec<_> = std::iter::from_fn(|| events.try_recv().ok()).collect();
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::PortMappingEstablished { external_addr }
if *external_addr == mapped_addr
)));
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::ExternalAddressDiscovered { addr }
if addr.as_socket_addr() == Some(mapped_addr)
)));
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::PortMappingFailed { error } if error == "simulated failure"
)));
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::PortMappingRemoved { external_addr }
if *external_addr == Some(mapped_addr)
)));
endpoint.shutdown().await;
}
}
#[tokio::test]
async fn test_port_mapping_address_change_event_surfaces() {
let config = P2pConfig::builder()
.port_mapping_enabled(false)
.build()
.expect("valid config");
if let Ok(endpoint) = P2pEndpoint::new(config).await {
let mut events = endpoint.subscribe();
let first_addr: SocketAddr = "198.51.100.90:42000".parse().expect("valid addr");
let second_addr: SocketAddr = "198.51.100.91:42000".parse().expect("valid addr");
endpoint.apply_port_mapping_snapshot(PortMappingSnapshot {
active: true,
external_addr: Some(first_addr),
});
endpoint.apply_port_mapping_snapshot(PortMappingSnapshot {
active: true,
external_addr: Some(second_addr),
});
let collected = collect_broadcast_events(&mut events);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::PortMappingAddressChanged {
previous_addr,
external_addr,
} if *previous_addr == first_addr && *external_addr == second_addr
)));
endpoint.shutdown().await;
}
}
#[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
#[tokio::test]
async fn test_port_mapping_startup_failure_is_non_fatal_for_endpoint_connectivity() {
let listener = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("listener config should build"),
)
.await
.expect("listener should bind");
let listener_addr = localhost_addr(listener.local_addr().expect("listener addr"));
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.build()
.expect("endpoint config should build"),
)
.await
.expect("endpoint should bind");
let mut events = endpoint.subscribe();
endpoint.apply_port_mapping_event(PortMappingEvent::Failed {
error: "startup mapping failed".to_string(),
});
let accept_handle = tokio::spawn({
let listener = listener.clone();
async move {
tokio::time::timeout(Duration::from_secs(20), listener.accept())
.await
.expect("listener accept should not time out")
}
});
let connection = tokio::time::timeout(
Duration::from_secs(20),
endpoint.connect_addr(listener_addr),
)
.await
.expect("direct connect should not time out")
.expect("direct connect should succeed");
assert_eq!(connection.remote_addr.as_socket_addr(), Some(listener_addr));
assert!(!endpoint.port_mapping_active());
assert_eq!(endpoint.port_mapping_addr(), None);
let collected = collect_broadcast_events(&mut events);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::PortMappingFailed { error } if error == "startup mapping failed"
)));
endpoint.shutdown().await;
listener.shutdown().await;
let _ = accept_handle.await;
}
#[tokio::test]
async fn test_port_mapping_removal_recomputes_relay_public_address_from_observed_address() {
let config = P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("valid config");
let listener = P2pEndpoint::new(config)
.await
.expect("listener should create");
let private_observed_addr: SocketAddr = "10.0.0.1:42000".parse().expect("valid addr");
let observed_addr: SocketAddr = "203.0.113.88:42000".parse().expect("valid addr");
listener
.inner
.set_test_observed_external_addrs(vec![private_observed_addr, observed_addr]);
let mapped_addr: SocketAddr = "198.51.100.55:41000".parse().expect("valid addr");
listener.apply_port_mapping_snapshot(PortMappingSnapshot {
active: true,
external_addr: Some(mapped_addr),
});
assert_eq!(
listener.inner.relay_server_public_address(),
Some(mapped_addr)
);
listener.apply_port_mapping_snapshot(PortMappingSnapshot::default());
assert_eq!(
listener.inner.relay_server_public_address(),
Some(observed_addr)
);
listener.shutdown().await;
}
#[tokio::test]
async fn test_active_relay_is_advertised_to_future_connected_peers() {
let config = P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("valid config");
let relay_endpoint = P2pEndpoint::new(config)
.await
.expect("relay endpoint should create");
relay_endpoint
.inner
.set_test_relay_public_addr("198.51.100.200:45000".parse().expect("valid addr"));
let future_peer = PeerConnection {
peer_id: PeerId([0x90; 32]),
remote_addr: TransportAddr::Udp("127.0.0.1:45001".parse().expect("valid addr")),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
relay_endpoint
.register_connected_peer(future_peer.clone())
.await;
assert!(
relay_endpoint
.inner
.test_relay_publish_attempted_for(future_peer.peer_id),
"future connected peers should trigger proactive relay re-advertisement"
);
relay_endpoint.shutdown().await;
}
#[tokio::test]
async fn test_runtime_assist_snapshot_reports_relay_bytes_forwarded() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("valid config"),
)
.await
.expect("endpoint should create");
endpoint.inner.record_test_relay_server_activity(2, 4096);
let snapshot = endpoint.runtime_assist_snapshot().await;
assert_eq!(snapshot.active_relay_sessions, 2);
assert_eq!(snapshot.relay_bytes_forwarded, 4096);
endpoint.shutdown().await;
}
#[tokio::test]
async fn test_relay_service_enabled_reports_effective_runtime_when_legacy_flag_is_disabled() {
let mut config = P2pConfig::builder()
.port_mapping_enabled(false)
.build()
.expect("valid config");
config.nat.enable_relay_service = false;
let endpoint = P2pEndpoint::new(config)
.await
.expect("endpoint should create");
assert!(
endpoint.relay_service_enabled(),
"status should reflect the always-on relay runtime"
);
endpoint.shutdown().await;
}
fn localhost_addr(addr: SocketAddr) -> SocketAddr {
if addr.ip().is_unspecified() {
SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), addr.port())
} else {
addr
}
}
#[test]
fn test_prioritize_direct_candidate_addrs_prefers_global_addresses() {
let private_v4: SocketAddr = "10.0.0.1:5483".parse().expect("valid addr");
let global_v4: SocketAddr = "198.51.100.20:5483".parse().expect("valid addr");
let global_v6: SocketAddr = "[2001:db8::20]:5483".parse().expect("valid addr");
let loopback: SocketAddr = "127.0.0.1:5483".parse().expect("valid addr");
let mut addrs = vec![private_v4, loopback, global_v4, global_v6];
prioritize_direct_candidate_addrs(&mut addrs);
assert_eq!(addrs[0], global_v6);
assert_eq!(addrs[1], global_v4);
assert_eq!(addrs[2], private_v4);
assert_eq!(addrs[3], loopback);
}
#[test]
fn test_drop_non_global_direct_candidates_when_global_present() {
let private_v4: SocketAddr = "10.200.0.1:5483".parse().expect("valid addr");
let global_v4: SocketAddr = "198.51.100.20:5483".parse().expect("valid addr");
let global_v6: SocketAddr = "[2001:db8::20]:5483".parse().expect("valid addr");
let link_local: SocketAddr = "169.254.10.1:5483".parse().expect("valid addr");
let loopback: SocketAddr = "127.0.0.1:5483".parse().expect("valid addr");
let mut addrs = vec![private_v4, link_local, loopback, global_v4, global_v6];
drop_non_global_direct_candidates_when_global_present(&mut addrs);
addrs.sort();
let expected = {
let mut v = vec![global_v4, global_v6];
v.sort();
v
};
assert_eq!(
addrs, expected,
"private/link-local/loopback must be dropped when Global candidates are present"
);
}
#[test]
fn test_drop_non_global_direct_candidates_preserves_lan_only_list() {
let private_v4: SocketAddr = "192.168.1.25:5483".parse().expect("valid addr");
let link_local_v6: SocketAddr = "[fe80::1]:5483".parse().expect("valid addr");
let original = vec![private_v4, link_local_v6];
let mut addrs = original.clone();
drop_non_global_direct_candidates_when_global_present(&mut addrs);
assert_eq!(
addrs, original,
"LAN-only candidate sets must not be emptied — the caller would have nothing to dial"
);
}
#[test]
fn test_select_preferred_relay_target_addr_prefers_listener_port() {
let listener: SocketAddr = "[2001:db8::20]:5483".parse().expect("valid addr");
let observed_ephemeral: SocketAddr = "[2001:db8::20]:37616".parse().expect("valid addr");
let selected = select_preferred_relay_target_addr(
&[listener],
&[],
&[observed_ephemeral],
Some(observed_ephemeral),
None,
);
assert_eq!(selected, Some(listener));
}
#[test]
fn test_select_preferred_relay_target_addr_prefers_reachable_over_external() {
let reachable: SocketAddr = "198.51.100.20:5483".parse().expect("valid addr");
let observed_ephemeral: SocketAddr = "198.51.100.20:37616".parse().expect("valid addr");
let selected = select_preferred_relay_target_addr(
&[],
&[reachable],
&[observed_ephemeral],
Some(observed_ephemeral),
None,
);
assert_eq!(selected, Some(reachable));
}
#[tokio::test]
async fn test_persist_direct_reachability_if_applicable_skips_hole_punch() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x33; 32]);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp("198.51.100.33:5483".parse().expect("valid addr")),
traversal_method: TraversalMethod::HolePunch,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
P2pEndpoint::persist_direct_peer_reachability_if_applicable(
endpoint.bootstrap_cache.as_ref(),
&peer_conn,
)
.await;
assert!(endpoint.bootstrap_cache.get_peer(&peer_id).await.is_none());
endpoint.shutdown().await;
}
#[tokio::test]
async fn test_persist_direct_reachability_if_applicable_records_direct() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x34; 32]);
let peer_conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp("198.51.100.34:5483".parse().expect("valid addr")),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
P2pEndpoint::persist_direct_peer_reachability_if_applicable(
endpoint.bootstrap_cache.as_ref(),
&peer_conn,
)
.await;
let cached_peer = endpoint
.bootstrap_cache
.get_peer(&peer_id)
.await
.expect("direct peer should be cached");
assert_eq!(
cached_peer.capabilities.direct_reachability_scope,
Some(ReachabilityScope::Global)
);
endpoint.shutdown().await;
}
#[tokio::test]
async fn test_peer_address_update_persists_hints_and_cache() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x44; 32]);
let peer_addr: SocketAddr = "127.0.0.1:45000".parse().expect("valid addr");
let advertised_addr: SocketAddr = "198.51.100.44:5483".parse().expect("valid addr");
let mut events = endpoint.subscribe();
endpoint
.register_connected_peer(PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(peer_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Server,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
})
.await;
P2pEndpoint::apply_peer_address_update(
endpoint.connected_peers.as_ref(),
endpoint.bootstrap_cache.as_ref(),
endpoint.peer_hint_records.as_ref(),
&endpoint.event_tx,
peer_addr,
advertised_addr,
)
.await;
assert!(
endpoint
.hinted_addrs_for_peer(peer_id)
.await
.contains(&advertised_addr)
);
let cached_peer = endpoint
.bootstrap_cache
.get_peer(&peer_id)
.await
.expect("peer should be cached");
assert!(cached_peer.preferred_addresses().contains(&advertised_addr));
let observed_events: Vec<_> = std::iter::from_fn(|| events.try_recv().ok()).collect();
assert!(observed_events.iter().any(|event| matches!(
event,
P2pEvent::PeerAddressUpdated {
peer_addr: observed_peer_addr,
advertised_addr: observed_advertised_addr,
} if *observed_peer_addr == peer_addr && *observed_advertised_addr == advertised_addr
)));
endpoint.shutdown().await;
}
#[tokio::test]
async fn test_upsert_peer_hints_feeds_coordinator_candidates() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x5a; 32]);
let hinted_addr: SocketAddr = "127.0.0.1:9000".parse().expect("valid addr");
let caps = PeerCapabilities {
supports_coordination: true,
..PeerCapabilities::default()
};
endpoint
.upsert_peer_hints(peer_id, vec![hinted_addr], Some(caps))
.await;
let candidates = endpoint.coordinator_candidates().await;
assert!(
candidates.contains(&hinted_addr),
"hinted coordinator address should be considered for orchestration"
);
endpoint.shutdown().await;
}
#[tokio::test]
async fn test_upsert_peer_hints_feeds_relay_cache_selection_after_runtime_hints_clear() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x6b; 32]);
let hinted_addr: SocketAddr = "198.51.100.61:9000".parse().expect("valid addr");
let target_addr: SocketAddr = "203.0.113.61:443".parse().expect("valid addr");
let caps = PeerCapabilities {
supports_relay: true,
..PeerCapabilities::default()
};
endpoint
.upsert_peer_hints(peer_id, vec![hinted_addr], Some(caps))
.await;
endpoint.peer_hint_records.write().await.clear();
let cached = endpoint
.bootstrap_cache
.get(&peer_id)
.await
.expect("cached hinted peer should exist");
assert!(cached.capabilities.hinted_supports_relay);
assert!(cached.capabilities.supports_relay);
let relays = endpoint
.bootstrap_cache
.select_relays_for_target(4, &target_addr, false)
.await;
assert!(
relays.iter().any(|peer| peer.peer_id == peer_id),
"persisted relay hint should feed bootstrap-cache relay selection"
);
endpoint.shutdown().await;
}
#[tokio::test]
async fn test_upsert_peer_hints_merge_addrs_and_roles() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let peer_id = PeerId([0x7c; 32]);
let addr_a: SocketAddr = "198.51.100.71:9000".parse().expect("valid addr");
let addr_b: SocketAddr = "198.51.100.72:9000".parse().expect("valid addr");
endpoint
.upsert_peer_hints(
peer_id,
vec![addr_a],
Some(PeerCapabilities {
supports_coordination: true,
..PeerCapabilities::default()
}),
)
.await;
endpoint
.upsert_peer_hints(
peer_id,
vec![addr_a, addr_b],
Some(PeerCapabilities {
supports_relay: true,
..PeerCapabilities::default()
}),
)
.await;
let hints = endpoint.peer_hint_records.read().await;
let runtime = hints.get(&peer_id).expect("runtime hints should exist");
assert_eq!(runtime.addrs.len(), 2);
assert!(runtime.addrs.contains(&addr_a));
assert!(runtime.addrs.contains(&addr_b));
assert!(runtime.capabilities.supports_relay);
assert!(runtime.capabilities.supports_coordination);
drop(hints);
let cached = endpoint
.bootstrap_cache
.get(&peer_id)
.await
.expect("cached hinted peer should exist");
assert_eq!(cached.addresses.len(), 2);
assert!(cached.addresses.contains(&addr_a));
assert!(cached.addresses.contains(&addr_b));
assert!(cached.capabilities.supports_relay);
assert!(cached.capabilities.supports_coordination);
assert!(cached.capabilities.hinted_supports_relay);
assert!(cached.capabilities.hinted_supports_coordination);
endpoint.shutdown().await;
}
fn mdns_peer_record(addr: SocketAddr, claimed_peer_id: PeerId) -> MdnsPeerRecord {
MdnsPeerRecord {
service: "ant-quic".to_string(),
fullname: format!(
"peer-{}._ant-quic._udp.local.",
hex::encode(&claimed_peer_id.0[..4])
),
hostname: "peer.local.".to_string(),
namespace: Some("workspace-a".to_string()),
claimed_peer_id: Some(claimed_peer_id),
addresses: vec![addr],
metadata: std::collections::BTreeMap::from([
("namespace".to_string(), "workspace-a".to_string()),
("peer_id".to_string(), hex::encode(claimed_peer_id.0)),
]),
eligible: true,
ineligible_reason: None,
}
}
#[tokio::test]
async fn test_mdns_discover_only_surfaces_without_auto_connecting() {
let node_b = crate::Node::bind(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.await
.expect("node_b should bind");
let endpoint_a = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.mdns(crate::unified_config::MdnsConfig {
enabled: true,
service: Some("ant-quic".to_string()),
namespace: Some("workspace-a".to_string()),
mode: crate::unified_config::MdnsMode::BrowseOnly,
auto_connect: crate::unified_config::AutoConnectPolicy::Disabled,
metadata: std::collections::BTreeMap::new(),
})
.build()
.expect("config should build"),
)
.await
.expect("endpoint_a should bind");
let addr_b = localhost_addr(node_b.local_addr().expect("node_b addr"));
endpoint_a.apply_mdns_runtime_event(MdnsRuntimeEvent::PeerEligible(mdns_peer_record(
addr_b,
node_b.peer_id(),
)));
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(endpoint_a.connected_peers().await.len(), 0);
assert_eq!(endpoint_a.mdns_snapshot().discovered_peers.len(), 1);
endpoint_a.shutdown().await;
node_b.shutdown().await;
}
#[tokio::test]
async fn test_mdns_approval_required_surfaces_without_auto_connecting() {
let node_b = crate::Node::bind(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.await
.expect("node_b should bind");
let endpoint_a = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.mdns(crate::unified_config::MdnsConfig {
enabled: true,
service: Some("ant-quic".to_string()),
namespace: Some("workspace-a".to_string()),
mode: crate::unified_config::MdnsMode::BrowseOnly,
auto_connect: crate::unified_config::AutoConnectPolicy::ApprovalRequired,
metadata: std::collections::BTreeMap::new(),
})
.build()
.expect("config should build"),
)
.await
.expect("endpoint_a should bind");
let mut events = endpoint_a.subscribe();
let addr_b = localhost_addr(node_b.local_addr().expect("node_b addr"));
endpoint_a.apply_mdns_runtime_event(MdnsRuntimeEvent::PeerEligible(mdns_peer_record(
addr_b,
node_b.peer_id(),
)));
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(endpoint_a.connected_peers().await.len(), 0);
let collected = collect_broadcast_events(&mut events);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::MdnsPeerApprovalRequired { peer, .. } if peer.claimed_peer_id == Some(node_b.peer_id())
)));
endpoint_a.shutdown().await;
node_b.shutdown().await;
}
#[tokio::test]
async fn test_mdns_allowlist_rejects_unapproved_peer_before_auto_connect() {
let node_b = crate::Node::bind(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.await
.expect("node_b should bind");
let allowed_peer = PeerId([0xac; 32]);
let endpoint_a = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.allow_discovered_peer(allowed_peer)
.mdns(crate::unified_config::MdnsConfig {
enabled: true,
service: Some("ant-quic".to_string()),
namespace: Some("workspace-a".to_string()),
mode: crate::unified_config::MdnsMode::BrowseOnly,
auto_connect: crate::unified_config::AutoConnectPolicy::Enabled,
metadata: std::collections::BTreeMap::new(),
})
.build()
.expect("config should build"),
)
.await
.expect("endpoint_a should bind");
let mut events = endpoint_a.subscribe();
let addr_b = localhost_addr(node_b.local_addr().expect("node_b addr"));
endpoint_a.apply_mdns_runtime_event(MdnsRuntimeEvent::PeerEligible(mdns_peer_record(
addr_b,
node_b.peer_id(),
)));
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(endpoint_a.connected_peers().await.len(), 0);
let collected = collect_broadcast_events(&mut events);
assert!(collected.iter().any(|event| matches!(
event,
P2pEvent::MdnsPeerIneligible { peer, reason }
if peer.claimed_peer_id == Some(node_b.peer_id())
&& reason.contains("not in the discovery allowlist")
)));
assert!(
!collected
.iter()
.any(|event| matches!(event, P2pEvent::MdnsAutoConnectAttempted { .. })),
"allowlist rejection should happen before scheduling auto-connect"
);
endpoint_a.shutdown().await;
node_b.shutdown().await;
}
#[tokio::test]
async fn test_mdns_skips_loopback_bind_hints() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
let mdns = endpoint.mdns_snapshot();
assert!(
!mdns.browsing,
"loopback-only bind hints must suppress background mDNS browsing"
);
assert!(
!mdns.advertising,
"loopback-only bind hints must suppress background mDNS advertising"
);
endpoint.shutdown().await;
}
#[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
#[tokio::test]
async fn test_mdns_auto_connect_succeeds_without_overriding_authenticated_identity() {
let node_b = crate::Node::bind(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.await
.expect("node_b should bind");
let endpoint_a = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.mdns(crate::unified_config::MdnsConfig {
enabled: true,
service: Some("ant-quic".to_string()),
namespace: Some("workspace-a".to_string()),
mode: crate::unified_config::MdnsMode::BrowseOnly,
auto_connect: crate::unified_config::AutoConnectPolicy::Enabled,
metadata: std::collections::BTreeMap::new(),
})
.build()
.expect("config should build"),
)
.await
.expect("endpoint_a should bind");
let mut events = endpoint_a.subscribe();
let accept_handle = tokio::spawn({
let node = node_b.clone();
async move {
let _ = tokio::time::timeout(Duration::from_secs(20), node.accept()).await;
}
});
let fake_claim = PeerId([0xee; 32]);
let addr_b = localhost_addr(node_b.local_addr().expect("node_b addr"));
endpoint_a.apply_mdns_runtime_event(MdnsRuntimeEvent::PeerEligible(mdns_peer_record(
addr_b, fake_claim,
)));
let success = tokio::time::timeout(Duration::from_secs(20), async {
loop {
match events.recv().await.expect("event should arrive") {
P2pEvent::MdnsAutoConnectSucceeded {
authenticated_peer_id,
..
} => break authenticated_peer_id,
_ => {}
}
}
})
.await
.expect("mDNS auto-connect success event should arrive");
assert_eq!(success, node_b.peer_id());
assert_ne!(success, fake_claim);
assert_eq!(endpoint_a.connected_peers().await.len(), 1);
endpoint_a.shutdown().await;
node_b.shutdown().await;
let _ = accept_handle.await;
}
#[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
#[tokio::test]
async fn test_mdns_discovered_peer_coexists_with_static_known_peer_dedup() {
let node_b = crate::Node::bind(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.await
.expect("node_b should bind");
let addr_b = localhost_addr(node_b.local_addr().expect("node_b addr"));
let endpoint_a = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.known_peer(addr_b)
.port_mapping_enabled(false)
.mdns(crate::unified_config::MdnsConfig {
enabled: true,
service: Some("ant-quic".to_string()),
namespace: Some("workspace-a".to_string()),
mode: crate::unified_config::MdnsMode::BrowseOnly,
auto_connect: crate::unified_config::AutoConnectPolicy::Enabled,
metadata: std::collections::BTreeMap::new(),
})
.build()
.expect("config should build"),
)
.await
.expect("endpoint_a should bind");
let accept_handle = tokio::spawn({
let node = node_b.clone();
async move {
for _ in 0..2 {
let _ = tokio::time::timeout(Duration::from_secs(20), node.accept()).await;
}
}
});
endpoint_a.apply_mdns_runtime_event(MdnsRuntimeEvent::PeerDiscovered(mdns_peer_record(
addr_b,
node_b.peer_id(),
)));
let connected =
tokio::time::timeout(Duration::from_secs(20), endpoint_a.connect_known_peers())
.await
.expect("connect_known_peers should not time out")
.expect("connect_known_peers should succeed");
assert_eq!(connected, 1);
assert_eq!(endpoint_a.connected_peers().await.len(), 1);
endpoint_a.shutdown().await;
node_b.shutdown().await;
let _ = accept_handle.await;
}
#[tokio::test]
async fn test_mdns_shutdown_is_idempotent() {
let endpoint = P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(
IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
0,
))
.port_mapping_enabled(false)
.mdns(crate::unified_config::MdnsConfig {
enabled: true,
service: Some("ant-quic".to_string()),
namespace: Some("workspace-a".to_string()),
mode: crate::unified_config::MdnsMode::Both,
auto_connect: crate::unified_config::AutoConnectPolicy::Disabled,
metadata: std::collections::BTreeMap::new(),
})
.build()
.expect("config should build"),
)
.await
.expect("endpoint should bind");
endpoint.shutdown().await;
endpoint.shutdown().await;
assert!(!endpoint.is_running());
}
#[test]
fn test_peer_connected_event_with_udp() {
let socket_addr: SocketAddr = "192.168.1.100:8080".parse().expect("valid addr");
let event = P2pEvent::PeerConnected {
peer_id: PeerId([0xab; 32]),
addr: TransportAddr::Udp(socket_addr),
side: Side::Client,
traversal_method: TraversalMethod::Direct,
};
if let P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} = event
{
assert_eq!(peer_id.0, [0xab; 32]);
assert_eq!(addr, TransportAddr::Udp(socket_addr));
assert!(side.is_client());
assert_eq!(traversal_method, TraversalMethod::Direct);
let extracted = addr.as_socket_addr();
assert_eq!(extracted, Some(socket_addr));
} else {
panic!("Expected PeerConnected event");
}
}
#[test]
fn test_peer_connected_event_with_ble() {
let device_id = [0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc];
let event = P2pEvent::PeerConnected {
peer_id: PeerId([0xcd; 32]),
addr: TransportAddr::Ble {
device_id,
service_uuid: None,
},
side: Side::Server,
traversal_method: TraversalMethod::Direct,
};
if let P2pEvent::PeerConnected {
peer_id,
addr,
side,
traversal_method,
} = event
{
assert_eq!(peer_id.0, [0xcd; 32]);
assert!(side.is_server());
assert_eq!(traversal_method, TraversalMethod::Direct);
assert!(addr.as_socket_addr().is_none());
if let TransportAddr::Ble {
device_id: mac,
service_uuid,
} = addr
{
assert_eq!(mac, device_id);
assert!(service_uuid.is_none());
} else {
panic!("Expected BLE address");
}
}
}
#[test]
fn test_external_address_discovered_udp() {
let socket_addr: SocketAddr = "203.0.113.1:12345".parse().expect("valid addr");
let event = P2pEvent::ExternalAddressDiscovered {
addr: TransportAddr::Udp(socket_addr),
};
if let P2pEvent::ExternalAddressDiscovered { addr } = event {
assert_eq!(addr, TransportAddr::Udp(socket_addr));
assert_eq!(addr.as_socket_addr(), Some(socket_addr));
} else {
panic!("Expected ExternalAddressDiscovered event");
}
}
#[test]
fn test_event_clone() {
let socket_addr: SocketAddr = "10.0.0.1:9000".parse().expect("valid addr");
let event = P2pEvent::PeerConnected {
peer_id: PeerId([0x11; 32]),
addr: TransportAddr::Udp(socket_addr),
side: Side::Client,
traversal_method: TraversalMethod::Direct,
};
let cloned = event.clone();
if let (
P2pEvent::PeerConnected {
peer_id: p1,
addr: a1,
..
},
P2pEvent::PeerConnected {
peer_id: p2,
addr: a2,
..
},
) = (&event, &cloned)
{
assert_eq!(p1.0, p2.0);
assert_eq!(a1, a2);
}
}
#[test]
fn test_peer_connection_with_transport_addr() {
let udp_addr: SocketAddr = "127.0.0.1:8080".parse().expect("valid addr");
let udp_conn = PeerConnection {
peer_id: PeerId([0u8; 32]),
remote_addr: TransportAddr::Udp(udp_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
assert_eq!(
udp_conn.remote_addr.as_socket_addr(),
Some(udp_addr),
"UDP connection should have extractable socket address"
);
let device_id = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66];
let ble_conn = PeerConnection {
peer_id: PeerId([1u8; 32]),
remote_addr: TransportAddr::Ble {
device_id,
service_uuid: None,
},
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
assert!(
ble_conn.remote_addr.as_socket_addr().is_none(),
"BLE connection should not have socket address"
);
}
#[test]
fn test_transport_addr_display_in_events() {
let socket_addr: SocketAddr = "192.168.1.1:9001".parse().expect("valid addr");
let event = P2pEvent::PeerConnected {
peer_id: PeerId([0xff; 32]),
addr: TransportAddr::Udp(socket_addr),
side: Side::Client,
traversal_method: TraversalMethod::Direct,
};
let debug_str = format!("{:?}", event);
assert!(
debug_str.contains("192.168.1.1"),
"Event debug should contain IP address"
);
assert!(
debug_str.contains("9001"),
"Event debug should contain port"
);
}
#[test]
fn test_connection_tracking_udp() {
use std::collections::HashMap;
let mut connections: HashMap<PeerId, PeerConnection> = HashMap::new();
let socket_addr: SocketAddr = "10.0.0.1:8080".parse().expect("valid addr");
let peer_id = PeerId([0x01; 32]);
let conn = PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(socket_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
connections.insert(peer_id, conn.clone());
assert!(connections.contains_key(&peer_id));
let retrieved = connections.get(&peer_id).expect("connection exists");
assert_eq!(retrieved.remote_addr, TransportAddr::Udp(socket_addr));
assert!(retrieved.authenticated);
}
#[test]
fn test_connection_tracking_multi_transport() {
use std::collections::HashMap;
let mut connections: HashMap<PeerId, PeerConnection> = HashMap::new();
let udp_addr: SocketAddr = "192.168.1.100:9000".parse().expect("valid addr");
let peer1 = PeerId([0x01; 32]);
connections.insert(
peer1,
PeerConnection {
peer_id: peer1,
remote_addr: TransportAddr::Udp(udp_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
},
);
let peer2 = PeerId([0x02; 32]);
let ble_device = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff];
connections.insert(
peer2,
PeerConnection {
peer_id: peer2,
remote_addr: TransportAddr::Ble {
device_id: ble_device,
service_uuid: None,
},
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
},
);
assert_eq!(connections.len(), 2);
assert!(
connections
.get(&peer1)
.unwrap()
.remote_addr
.as_socket_addr()
.is_some()
);
assert!(
connections
.get(&peer2)
.unwrap()
.remote_addr
.as_socket_addr()
.is_none()
);
}
#[test]
fn test_connection_lookup_by_transport_addr() {
use std::collections::HashMap;
let mut connections: HashMap<PeerId, PeerConnection> = HashMap::new();
let addrs = [
("10.0.0.1:8080", [0x01; 32]),
("10.0.0.2:8080", [0x02; 32]),
("10.0.0.3:8080", [0x03; 32]),
];
for (addr_str, peer_bytes) in addrs {
let socket_addr: SocketAddr = addr_str.parse().expect("valid addr");
let peer_id = PeerId(peer_bytes);
connections.insert(
peer_id,
PeerConnection {
peer_id,
remote_addr: TransportAddr::Udp(socket_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
},
);
}
let target: SocketAddr = "10.0.0.2:8080".parse().expect("valid addr");
let target_addr = TransportAddr::Udp(target);
let found = connections.values().find(|c| c.remote_addr == target_addr);
assert!(found.is_some());
assert_eq!(found.unwrap().peer_id.0, [0x02; 32]);
}
#[test]
fn test_transport_addr_equality_in_tracking() {
let addr1: SocketAddr = "192.168.1.1:8080".parse().expect("valid addr");
let addr2: SocketAddr = "192.168.1.1:8080".parse().expect("valid addr");
let addr3: SocketAddr = "192.168.1.1:8081".parse().expect("valid addr");
let t1 = TransportAddr::Udp(addr1);
let t2 = TransportAddr::Udp(addr2);
let t3 = TransportAddr::Udp(addr3);
assert_eq!(t1, t2);
assert_ne!(t1, t3);
let ble = TransportAddr::Ble {
device_id: [0; 6],
service_uuid: None,
};
assert_ne!(t1, ble);
}
#[test]
fn test_peer_connection_update_preserves_transport_addr() {
let socket_addr: SocketAddr = "172.16.0.1:5000".parse().expect("valid addr");
let mut conn = PeerConnection {
peer_id: PeerId([0xaa; 32]),
remote_addr: TransportAddr::Udp(socket_addr),
traversal_method: TraversalMethod::Direct,
side: Side::Client,
authenticated: false,
connected_at: Instant::now(),
last_activity: Instant::now(),
};
conn.authenticated = true;
conn.last_activity = Instant::now();
assert_eq!(conn.remote_addr, TransportAddr::Udp(socket_addr));
assert!(conn.authenticated);
}
}