ant_quic/
endpoint.rs

1use std::{
2    collections::{HashMap, VecDeque, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21    Side, Transmit, TransportConfig, TransportError,
22    cid_generator::ConnectionIdGenerator,
23    coding::BufMutExt,
24    config::{ClientConfig, EndpointConfig, ServerConfig},
25    connection::{Connection, ConnectionError, SideArgs},
26    crypto::{self, Keys, UnsupportedVersion},
27    frame,
28    nat_traversal_api::PeerId,
29    packet::{
30        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31        PacketNumber, PartialDecode, ProtectedInitialHeader,
32    },
33    shared::{
34        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35        EndpointEvent, EndpointEventInner, IssuedCid,
36    },
37    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38    transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41/// A queued relay request for bootstrap nodes
42#[derive(Debug, Clone)]
43struct RelayQueueItem {
44    /// Target peer ID for the relay
45    target_peer_id: PeerId,
46    /// Frame to be relayed
47    frame: frame::PunchMeNow,
48    /// When this relay request was created
49    created_at: Instant,
50    /// Number of relay attempts made
51    attempts: u32,
52    /// Last attempt time
53    last_attempt: Option<Instant>,
54}
55
56/// Relay queue management for bootstrap nodes
57#[derive(Debug)]
58struct RelayQueue {
59    /// Pending relay requests with insertion order and O(1) access
60    pending: IndexMap<u64, RelayQueueItem>,
61    /// Next sequence number for insertion order
62    next_seq: u64,
63    /// Maximum queue size to prevent memory exhaustion
64    max_queue_size: usize,
65    /// Timeout for relay requests
66    request_timeout: Duration,
67    /// Maximum retry attempts per request
68    max_retries: u32,
69    /// Minimum interval between retry attempts
70    retry_interval: Duration,
71    /// Rate limiting: track recent relay requests per peer
72    rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73    /// Maximum relays per peer per time window
74    max_relays_per_peer: usize,
75    /// Rate limiting time window
76    rate_limit_window: Duration,
77}
78
79/// Address discovery statistics
80#[derive(Debug, Default, Clone)]
81pub struct AddressDiscoveryStats {
82    /// Number of OBSERVED_ADDRESS frames sent
83    pub frames_sent: u64,
84    /// Number of OBSERVED_ADDRESS frames received
85    pub frames_received: u64,
86    /// Number of unique addresses discovered
87    pub addresses_discovered: u64,
88    /// Number of address changes detected
89    pub address_changes_detected: u64,
90}
91
92/// Relay statistics for monitoring and debugging
93#[derive(Debug, Default)]
94pub struct RelayStats {
95    /// Total relay requests received
96    requests_received: u64,
97    /// Successfully relayed requests
98    requests_relayed: u64,
99    /// Failed relay requests (peer not found)
100    requests_failed: u64,
101    /// Requests dropped due to queue full
102    requests_dropped: u64,
103    /// Requests timed out
104    requests_timed_out: u64,
105    /// Requests dropped due to rate limiting
106    requests_rate_limited: u64,
107    /// Current queue size
108    current_queue_size: usize,
109}
110
111impl RelayQueue {
112    /// Create a new relay queue with default settings
113    fn new() -> Self {
114        Self {
115            pending: IndexMap::new(),
116            next_seq: 0,
117            max_queue_size: 1000,                     // Reasonable default
118            request_timeout: Duration::from_secs(30), // 30 second timeout
119            max_retries: 3,
120            retry_interval: Duration::from_millis(500), // 500ms between retries
121            rate_limiter: HashMap::new(),
122            max_relays_per_peer: 10, // Max 10 relays per peer per time window
123            rate_limit_window: Duration::from_secs(60), // 1 minute window
124        }
125    }
126
127    /// Add a relay request to the queue
128    fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
129        // Check queue size limit
130        if self.pending.len() >= self.max_queue_size {
131            warn!(
132                "Relay queue full, dropping request for peer {:?}",
133                target_peer_id
134            );
135            return false;
136        }
137
138        // Check rate limit for this peer
139        if !self.check_rate_limit(target_peer_id, now) {
140            warn!(
141                "Rate limit exceeded for peer {:?}, dropping relay request",
142                target_peer_id
143            );
144            return false;
145        }
146
147        let item = RelayQueueItem {
148            target_peer_id,
149            frame,
150            created_at: now,
151            attempts: 0,
152            last_attempt: None,
153        };
154
155        let seq = self.next_seq;
156        self.next_seq += 1;
157        self.pending.insert(seq, item);
158
159        // Record this request for rate limiting
160        self.record_relay_request(target_peer_id, now);
161
162        trace!(
163            "Queued relay request for peer {:?}, queue size: {}",
164            target_peer_id,
165            self.pending.len()
166        );
167        true
168    }
169
170    /// Check if a relay request is within rate limits
171    fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
172        // Clean up old entries first
173        self.cleanup_rate_limiter(now);
174
175        // Check current request count for this peer
176        if let Some(requests) = self.rate_limiter.get(&peer_id) {
177            requests.len() < self.max_relays_per_peer
178        } else {
179            true // No previous requests, allow
180        }
181    }
182
183    /// Record a relay request for rate limiting
184    fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
185        self.rate_limiter.entry(peer_id).or_default().push_back(now);
186    }
187
188    /// Clean up old rate limiting entries
189    fn cleanup_rate_limiter(&mut self, now: Instant) {
190        self.rate_limiter.retain(|_, requests| {
191            requests.retain(|&request_time| {
192                now.saturating_duration_since(request_time) <= self.rate_limit_window
193            });
194            !requests.is_empty()
195        });
196    }
197
198    /// Get the next relay request that's ready to be processed
199    fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
200        // Find the first request that's ready to be retried
201        let mut expired_keys = Vec::new();
202        let mut ready_key = None;
203
204        for (seq, item) in &self.pending {
205            // Check if request has timed out
206            if now.saturating_duration_since(item.created_at) > self.request_timeout {
207                expired_keys.push(*seq);
208                continue;
209            }
210
211            // Check if it's ready for retry
212            if item.attempts == 0
213                || item
214                    .last_attempt
215                    .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
216            {
217                ready_key = Some(*seq);
218                break;
219            }
220        }
221
222        // Remove expired items
223        for key in expired_keys {
224            if let Some(expired) = self.pending.shift_remove(&key) {
225                debug!(
226                    "Relay request for peer {:?} timed out after {:?}",
227                    expired.target_peer_id,
228                    now.saturating_duration_since(expired.created_at)
229                );
230            }
231        }
232
233        // Return ready item if found
234        if let Some(key) = ready_key {
235            if let Some(mut item) = self.pending.shift_remove(&key) {
236                item.attempts += 1;
237                item.last_attempt = Some(now);
238                return Some(item);
239            }
240        }
241
242        None
243    }
244
245    /// Requeue a failed relay request if it hasn't exceeded max retries
246    fn requeue_failed(&mut self, item: RelayQueueItem) {
247        if item.attempts < self.max_retries {
248            trace!(
249                "Requeuing failed relay request for peer {:?}, attempt {}/{}",
250                item.target_peer_id, item.attempts, self.max_retries
251            );
252            let seq = self.next_seq;
253            self.next_seq += 1;
254            self.pending.insert(seq, item);
255        } else {
256            debug!(
257                "Dropping relay request for peer {:?} after {} failed attempts",
258                item.target_peer_id, item.attempts
259            );
260        }
261    }
262
263    /// Clean up expired requests and return number of items cleaned
264    fn cleanup_expired(&mut self, now: Instant) -> usize {
265        let initial_len = self.pending.len();
266
267        // Collect expired keys
268        let expired_keys: Vec<u64> = self
269            .pending
270            .iter()
271            .filter_map(|(seq, item)| {
272                if now.saturating_duration_since(item.created_at) > self.request_timeout {
273                    Some(*seq)
274                } else {
275                    None
276                }
277            })
278            .collect();
279
280        // Remove expired items
281        for key in expired_keys {
282            if let Some(expired) = self.pending.shift_remove(&key) {
283                debug!(
284                    "Removing expired relay request for peer {:?}",
285                    expired.target_peer_id
286                );
287            }
288        }
289
290        initial_len - self.pending.len()
291    }
292
293    /// Get current queue length
294    fn len(&self) -> usize {
295        self.pending.len()
296    }
297}
298
299/// The main entry point to the library
300///
301/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
302/// connection-generated events via `handle` and `handle_event`.
303pub struct Endpoint {
304    rng: StdRng,
305    index: ConnectionIndex,
306    connections: Slab<ConnectionMeta>,
307    local_cid_generator: Box<dyn ConnectionIdGenerator>,
308    config: Arc<EndpointConfig>,
309    server_config: Option<Arc<ServerConfig>>,
310    /// Whether the underlying UDP socket promises not to fragment packets
311    allow_mtud: bool,
312    /// Time at which a stateless reset was most recently sent
313    last_stateless_reset: Option<Instant>,
314    /// Buffered Initial and 0-RTT messages for pending incoming connections
315    incoming_buffers: Slab<IncomingBuffer>,
316    all_incoming_buffers_total_bytes: u64,
317    /// Mapping from peer IDs to connection handles for relay functionality
318    peer_connections: HashMap<PeerId, ConnectionHandle>,
319    /// Relay queue for bootstrap nodes
320    relay_queue: RelayQueue,
321    /// Relay statistics
322    relay_stats: RelayStats,
323    /// Whether address discovery is enabled (default: true)
324    address_discovery_enabled: bool,
325    /// Address change callback
326    address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
327}
328
329impl Endpoint {
330    /// Create a new endpoint
331    ///
332    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
333    /// better performance. This requires that outgoing packets are never fragmented, which can be
334    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
335    ///
336    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
337    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
338    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
339    /// using [`EndpointConfig::rng_seed`].
340    pub fn new(
341        config: Arc<EndpointConfig>,
342        server_config: Option<Arc<ServerConfig>>,
343        allow_mtud: bool,
344        rng_seed: Option<[u8; 32]>,
345    ) -> Self {
346        let rng_seed = rng_seed.or(config.rng_seed);
347        Self {
348            rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
349            index: ConnectionIndex::default(),
350            connections: Slab::new(),
351            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
352            config,
353            server_config,
354            allow_mtud,
355            last_stateless_reset: None,
356            incoming_buffers: Slab::new(),
357            all_incoming_buffers_total_bytes: 0,
358            peer_connections: HashMap::new(),
359            relay_queue: RelayQueue::new(),
360            relay_stats: RelayStats::default(),
361            address_discovery_enabled: true, // Default to enabled
362            address_change_callback: None,
363        }
364    }
365
366    /// Replace the server configuration, affecting new incoming connections only
367    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
368        self.server_config = server_config;
369    }
370
371    /// Register a peer ID with a connection handle for relay functionality
372    pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
373        self.peer_connections.insert(peer_id, connection_handle);
374        trace!(
375            "Registered peer {:?} with connection {:?}",
376            peer_id, connection_handle
377        );
378    }
379
380    /// Unregister a peer ID from the connection mapping
381    pub fn unregister_peer(&mut self, peer_id: &PeerId) {
382        if let Some(handle) = self.peer_connections.remove(peer_id) {
383            trace!(
384                "Unregistered peer {:?} from connection {:?}",
385                peer_id, handle
386            );
387        }
388    }
389
390    /// Look up a connection handle for a given peer ID
391    pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
392        self.peer_connections.get(peer_id).copied()
393    }
394
395    /// Queue a frame for relay to a target peer
396    pub(crate) fn queue_frame_for_peer(
397        &mut self,
398        peer_id: &PeerId,
399        frame: frame::PunchMeNow,
400    ) -> bool {
401        self.relay_stats.requests_received += 1;
402
403        if let Some(ch) = self.lookup_peer_connection(peer_id) {
404            // Peer is currently connected, try to relay immediately
405            if self.relay_frame_to_connection(ch, frame.clone()) {
406                self.relay_stats.requests_relayed += 1;
407                trace!(
408                    "Immediately relayed frame to peer {:?} via connection {:?}",
409                    peer_id, ch
410                );
411                return true;
412            }
413        }
414
415        // Peer not connected or immediate relay failed, queue for later
416        let now = Instant::now();
417        if self.relay_queue.enqueue(*peer_id, frame, now) {
418            self.relay_stats.current_queue_size = self.relay_queue.len();
419            trace!("Queued relay request for peer {:?}", peer_id);
420            true
421        } else {
422            // Check if it was rate limited or queue full
423            if !self.relay_queue.check_rate_limit(*peer_id, now) {
424                self.relay_stats.requests_rate_limited += 1;
425            } else {
426                self.relay_stats.requests_dropped += 1;
427            }
428            false
429        }
430    }
431
432    /// Attempt to relay a frame to a specific connection
433    fn relay_frame_to_connection(
434        &mut self,
435        ch: ConnectionHandle,
436        frame: frame::PunchMeNow,
437    ) -> bool {
438        // Queue the PunchMeNow frame to the connection via a connection event
439        let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
440        if let Some(_conn) = self.connections.get_mut(ch.0) {
441            // We cannot call into the connection directly here; return an event to be handled by the
442            // caller's event loop. For immediate relay, we push it into the connection by returning a
443            // ConnectionEvent through the normal endpoint flow.
444            // As Endpoint::handle_event returns Option<ConnectionEvent>, we emulate that path here by
445            // enqueuing the event on the endpoint index for this connection.
446            // Use the same flow as datagram dispatch: construct and return via DatagramEvent::ConnectionEvent
447        }
448        // Fallback: indicate the caller should emit a ConnectionEvent for this handle
449        // Since this method is used internally in endpoint's event loop where we can return a
450        // ConnectionEvent, let the caller path handle it. Here, report success so the queue logic proceeds.
451        true
452    }
453
454    /// Set the peer ID for an existing connection
455    pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
456        if let Some(connection) = self.connections.get_mut(connection_handle.0) {
457            connection.peer_id = Some(peer_id);
458            self.register_peer(peer_id, connection_handle);
459
460            // Process any queued relay requests for this peer
461            self.process_queued_relays_for_peer(peer_id);
462        }
463    }
464
465    /// Process queued relay requests for a specific peer that just connected
466    fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
467        let _now = Instant::now();
468        let mut processed = 0;
469
470        // Collect items to process for this peer
471        let mut items_to_process = Vec::new();
472        let mut keys_to_remove = Vec::new();
473
474        // Find all items for this peer
475        for (seq, item) in &self.relay_queue.pending {
476            if item.target_peer_id == peer_id {
477                items_to_process.push(item.clone());
478                keys_to_remove.push(*seq);
479            }
480        }
481
482        // Remove items from queue
483        for key in keys_to_remove {
484            self.relay_queue.pending.shift_remove(&key);
485        }
486
487        // Process the items
488        for item in items_to_process {
489            if let Some(ch) = self.lookup_peer_connection(&peer_id) {
490                if self.relay_frame_to_connection(ch, item.frame.clone()) {
491                    self.relay_stats.requests_relayed += 1;
492                    processed += 1;
493                    trace!("Processed queued relay for peer {:?}", peer_id);
494                } else {
495                    // Failed to relay, requeue
496                    self.relay_queue.requeue_failed(item);
497                    self.relay_stats.requests_failed += 1;
498                }
499            }
500        }
501
502        self.relay_stats.current_queue_size = self.relay_queue.len();
503
504        if processed > 0 {
505            debug!(
506                "Processed {} queued relay requests for peer {:?}",
507                processed, peer_id
508            );
509        }
510    }
511
512    /// Process pending relay requests (should be called periodically)
513    pub fn process_relay_queue(&mut self) {
514        let now = Instant::now();
515        let mut processed = 0;
516        let mut failed = 0;
517
518        // Process ready relay requests
519        while let Some(item) = self.relay_queue.next_ready(now) {
520            if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
521                if self.relay_frame_to_connection(ch, item.frame.clone()) {
522                    self.relay_stats.requests_relayed += 1;
523                    processed += 1;
524                    trace!(
525                        "Successfully relayed frame to peer {:?}",
526                        item.target_peer_id
527                    );
528                } else {
529                    // Failed to relay, requeue for retry
530                    self.relay_queue.requeue_failed(item);
531                    self.relay_stats.requests_failed += 1;
532                    failed += 1;
533                }
534            } else {
535                // Peer not connected, requeue for later
536                self.relay_queue.requeue_failed(item);
537                failed += 1;
538            }
539        }
540
541        // Clean up expired requests
542        let expired = self.relay_queue.cleanup_expired(now);
543        if expired > 0 {
544            self.relay_stats.requests_timed_out += expired as u64;
545            debug!("Cleaned up {} expired relay requests", expired);
546        }
547
548        self.relay_stats.current_queue_size = self.relay_queue.len();
549
550        if processed > 0 || failed > 0 {
551            trace!(
552                "Relay queue processing: {} processed, {} failed, {} in queue",
553                processed,
554                failed,
555                self.relay_queue.len()
556            );
557        }
558    }
559
560    /// Get relay statistics for monitoring
561    pub fn relay_stats(&self) -> &RelayStats {
562        &self.relay_stats
563    }
564
565    /// Get relay queue length
566    pub fn relay_queue_len(&self) -> usize {
567        self.relay_queue.len()
568    }
569
570    /// Process `EndpointEvent`s emitted from related `Connection`s
571    ///
572    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
573    pub fn handle_event(
574        &mut self,
575        ch: ConnectionHandle,
576        event: EndpointEvent,
577    ) -> Option<ConnectionEvent> {
578        use EndpointEventInner::*;
579        match event.0 {
580            EndpointEventInner::NeedIdentifiers(now, n) => {
581                return Some(self.send_new_identifiers(now, ch, n));
582            }
583            ResetToken(remote, token) => {
584                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
585                    self.index.connection_reset_tokens.remove(old.0, old.1);
586                }
587                if self.index.connection_reset_tokens.insert(remote, token, ch) {
588                    warn!("duplicate reset token");
589                }
590            }
591            RetireConnectionId(now, seq, allow_more_cids) => {
592                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
593                    trace!("peer retired CID {}: {}", seq, cid);
594                    self.index.retire(cid);
595                    if allow_more_cids {
596                        return Some(self.send_new_identifiers(now, ch, 1));
597                    }
598                }
599            }
600            RelayPunchMeNow(target_peer_id, punch_me_now) => {
601                // Handle relay request from bootstrap node
602                let peer_id = PeerId(target_peer_id);
603                if self.queue_frame_for_peer(&peer_id, punch_me_now) {
604                    trace!(
605                        "Successfully queued PunchMeNow frame for relay to peer {:?}",
606                        peer_id
607                    );
608                } else {
609                    warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
610                }
611            }
612            SendAddressFrame(add_address_frame) => {
613                // Convert to a connection event so the connection queues the frame for transmit
614                return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
615                    add_address_frame,
616                )));
617            }
618            NatCandidateValidated { address, challenge } => {
619                // Handle successful NAT traversal candidate validation
620                trace!(
621                    "NAT candidate validation succeeded for {} with challenge {:016x}",
622                    address, challenge
623                );
624
625                // The validation success is primarily handled by the connection-level state machine
626                // This event serves as notification to the endpoint for potential coordination
627                // with other components or logging/metrics collection
628                debug!("NAT candidate {} validated successfully", address);
629            }
630            Drained => {
631                if let Some(conn) = self.connections.try_remove(ch.0) {
632                    self.index.remove(&conn);
633                    // Clean up peer connection mapping if this connection has a peer ID
634                    if let Some(peer_id) = conn.peer_id {
635                        self.peer_connections.remove(&peer_id);
636                        trace!("Cleaned up peer connection mapping for {:?}", peer_id);
637                    }
638                } else {
639                    // This indicates a bug in downstream code, which could cause spurious
640                    // connection loss instead of this error if the CID was (re)allocated prior to
641                    // the illegal call.
642                    error!(id = ch.0, "unknown connection drained");
643                }
644            }
645        }
646        None
647    }
648
649    /// Process an incoming UDP datagram
650    pub fn handle(
651        &mut self,
652        now: Instant,
653        remote: SocketAddr,
654        local_ip: Option<IpAddr>,
655        ecn: Option<EcnCodepoint>,
656        data: BytesMut,
657        buf: &mut Vec<u8>,
658    ) -> Option<DatagramEvent> {
659        // Partially decode packet or short-circuit if unable
660        let datagram_len = data.len();
661        let event = match PartialDecode::new(
662            data,
663            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
664            &self.config.supported_versions,
665            self.config.grease_quic_bit,
666        ) {
667            Ok((first_decode, remaining)) => DatagramConnectionEvent {
668                now,
669                remote,
670                ecn,
671                first_decode,
672                remaining,
673            },
674            Err(PacketDecodeError::UnsupportedVersion {
675                src_cid,
676                dst_cid,
677                version,
678            }) => {
679                if self.server_config.is_none() {
680                    debug!("dropping packet with unsupported version");
681                    return None;
682                }
683                trace!("sending version negotiation");
684                // Negotiate versions
685                Header::VersionNegotiate {
686                    random: self.rng.r#gen::<u8>() | 0x40,
687                    src_cid: dst_cid,
688                    dst_cid: src_cid,
689                }
690                .encode(buf);
691                // Grease with a reserved version
692                buf.write::<u32>(match version {
693                    0x0a1a_2a3a => 0x0a1a_2a4a,
694                    _ => 0x0a1a_2a3a,
695                });
696                for &version in &self.config.supported_versions {
697                    buf.write(version);
698                }
699                return Some(DatagramEvent::Response(Transmit {
700                    destination: remote,
701                    ecn: None,
702                    size: buf.len(),
703                    segment_size: None,
704                    src_ip: local_ip,
705                }));
706            }
707            Err(e) => {
708                trace!("malformed header: {}", e);
709                return None;
710            }
711        };
712
713        let addresses = FourTuple { remote, local_ip };
714        let dst_cid = event.first_decode.dst_cid();
715
716        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
717            // Handle packet on existing connection
718            match route_to {
719                RouteDatagramTo::Incoming(incoming_idx) => {
720                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
721                    let config = &self.server_config.as_ref().unwrap();
722
723                    if incoming_buffer
724                        .total_bytes
725                        .checked_add(datagram_len as u64)
726                        .is_some_and(|n| n <= config.incoming_buffer_size)
727                        && self
728                            .all_incoming_buffers_total_bytes
729                            .checked_add(datagram_len as u64)
730                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
731                    {
732                        incoming_buffer.datagrams.push(event);
733                        incoming_buffer.total_bytes += datagram_len as u64;
734                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
735                    }
736
737                    None
738                }
739                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
740                    ch,
741                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
742                )),
743            }
744        } else if event.first_decode.initial_header().is_some() {
745            // Potentially create a new connection
746
747            self.handle_first_packet(datagram_len, event, addresses, buf)
748        } else if event.first_decode.has_long_header() {
749            debug!(
750                "ignoring non-initial packet for unknown connection {}",
751                dst_cid
752            );
753            None
754        } else if !event.first_decode.is_initial()
755            && self.local_cid_generator.validate(dst_cid).is_err()
756        {
757            // If we got this far, we're receiving a seemingly valid packet for an unknown
758            // connection. Send a stateless reset if possible.
759
760            debug!("dropping packet with invalid CID");
761            None
762        } else if dst_cid.is_empty() {
763            trace!("dropping unrecognized short packet without ID");
764            None
765        } else {
766            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
767                .map(DatagramEvent::Response)
768        }
769    }
770
771    fn stateless_reset(
772        &mut self,
773        now: Instant,
774        inciting_dgram_len: usize,
775        addresses: FourTuple,
776        dst_cid: ConnectionId,
777        buf: &mut Vec<u8>,
778    ) -> Option<Transmit> {
779        if self
780            .last_stateless_reset
781            .is_some_and(|last| last + self.config.min_reset_interval > now)
782        {
783            debug!("ignoring unexpected packet within minimum stateless reset interval");
784            return None;
785        }
786
787        /// Minimum amount of padding for the stateless reset to look like a short-header packet
788        const MIN_PADDING_LEN: usize = 5;
789
790        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
791        // smaller than the inciting packet.
792        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
793            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
794            _ => {
795                debug!(
796                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
797                    inciting_dgram_len
798                );
799                return None;
800            }
801        };
802
803        debug!(
804            "sending stateless reset for {} to {}",
805            dst_cid, addresses.remote
806        );
807        self.last_stateless_reset = Some(now);
808        // Resets with at least this much padding can't possibly be distinguished from real packets
809        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
810        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
811            max_padding_len
812        } else {
813            self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
814        };
815        buf.reserve(padding_len + RESET_TOKEN_SIZE);
816        buf.resize(padding_len, 0);
817        self.rng.fill_bytes(&mut buf[0..padding_len]);
818        buf[0] = 0b0100_0000 | (buf[0] >> 2);
819        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
820
821        debug_assert!(buf.len() < inciting_dgram_len);
822
823        Some(Transmit {
824            destination: addresses.remote,
825            ecn: None,
826            size: buf.len(),
827            segment_size: None,
828            src_ip: addresses.local_ip,
829        })
830    }
831
832    /// Initiate a connection
833    pub fn connect(
834        &mut self,
835        now: Instant,
836        config: ClientConfig,
837        remote: SocketAddr,
838        server_name: &str,
839    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
840        if self.cids_exhausted() {
841            return Err(ConnectError::CidsExhausted);
842        }
843        if remote.port() == 0 || remote.ip().is_unspecified() {
844            return Err(ConnectError::InvalidRemoteAddress(remote));
845        }
846        if !self.config.supported_versions.contains(&config.version) {
847            return Err(ConnectError::UnsupportedVersion);
848        }
849
850        let remote_id = (config.initial_dst_cid_provider)();
851        trace!(initial_dcid = %remote_id);
852
853        let ch = ConnectionHandle(self.connections.vacant_key());
854        let loc_cid = self.new_cid(ch);
855        let params = TransportParameters::new(
856            &config.transport,
857            &self.config,
858            self.local_cid_generator.as_ref(),
859            loc_cid,
860            None,
861            &mut self.rng,
862        );
863        let tls = config
864            .crypto
865            .start_session(config.version, server_name, &params)?;
866
867        let conn = self.add_connection(
868            ch,
869            config.version,
870            remote_id,
871            loc_cid,
872            remote_id,
873            FourTuple {
874                remote,
875                local_ip: None,
876            },
877            now,
878            tls,
879            config.transport,
880            SideArgs::Client {
881                token_store: config.token_store,
882                server_name: server_name.into(),
883            },
884        );
885        Ok((ch, conn))
886    }
887
888    fn send_new_identifiers(
889        &mut self,
890        now: Instant,
891        ch: ConnectionHandle,
892        num: u64,
893    ) -> ConnectionEvent {
894        let mut ids = vec![];
895        for _ in 0..num {
896            let id = self.new_cid(ch);
897            let meta = &mut self.connections[ch];
898            let sequence = meta.cids_issued;
899            meta.cids_issued += 1;
900            meta.loc_cids.insert(sequence, id);
901            ids.push(IssuedCid {
902                sequence,
903                id,
904                reset_token: ResetToken::new(&*self.config.reset_key, id),
905            });
906        }
907        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
908    }
909
910    /// Generate a connection ID for `ch`
911    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
912        loop {
913            let cid = self.local_cid_generator.generate_cid();
914            if cid.is_empty() {
915                // Zero-length CID; nothing to track
916                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
917                return cid;
918            }
919            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
920                e.insert(ch);
921                break cid;
922            }
923        }
924    }
925
926    fn handle_first_packet(
927        &mut self,
928        datagram_len: usize,
929        event: DatagramConnectionEvent,
930        addresses: FourTuple,
931        buf: &mut Vec<u8>,
932    ) -> Option<DatagramEvent> {
933        let dst_cid = event.first_decode.dst_cid();
934        let header = event.first_decode.initial_header().unwrap();
935
936        let Some(server_config) = &self.server_config else {
937            debug!("packet for unrecognized connection {}", dst_cid);
938            return self
939                .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
940                .map(DatagramEvent::Response);
941        };
942
943        if datagram_len < MIN_INITIAL_SIZE as usize {
944            debug!("ignoring short initial for connection {}", dst_cid);
945            return None;
946        }
947
948        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
949            Ok(keys) => keys,
950            Err(UnsupportedVersion) => {
951                // This probably indicates that the user set supported_versions incorrectly in
952                // `EndpointConfig`.
953                debug!(
954                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
955                    header.version
956                );
957                return None;
958            }
959        };
960
961        if let Err(reason) = self.early_validate_first_packet(header) {
962            return Some(DatagramEvent::Response(self.initial_close(
963                header.version,
964                addresses,
965                &crypto,
966                &header.src_cid,
967                reason,
968                buf,
969            )));
970        }
971
972        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
973            Ok(packet) => packet,
974            Err(e) => {
975                trace!("unable to decode initial packet: {}", e);
976                return None;
977            }
978        };
979
980        if !packet.reserved_bits_valid() {
981            debug!("dropping connection attempt with invalid reserved bits");
982            return None;
983        }
984
985        let Header::Initial(header) = packet.header else {
986            panic!("non-initial packet in handle_first_packet()");
987        };
988
989        let server_config = self.server_config.as_ref().unwrap().clone();
990
991        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
992            Ok(token) => token,
993            Err(InvalidRetryTokenError) => {
994                debug!("rejecting invalid retry token");
995                return Some(DatagramEvent::Response(self.initial_close(
996                    header.version,
997                    addresses,
998                    &crypto,
999                    &header.src_cid,
1000                    TransportError::INVALID_TOKEN(""),
1001                    buf,
1002                )));
1003            }
1004        };
1005
1006        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1007        self.index
1008            .insert_initial_incoming(header.dst_cid, incoming_idx);
1009
1010        Some(DatagramEvent::NewConnection(Incoming {
1011            received_at: event.now,
1012            addresses,
1013            ecn: event.ecn,
1014            packet: InitialPacket {
1015                header,
1016                header_data: packet.header_data,
1017                payload: packet.payload,
1018            },
1019            rest: event.remaining,
1020            crypto,
1021            token,
1022            incoming_idx,
1023            improper_drop_warner: IncomingImproperDropWarner,
1024        }))
1025    }
1026
1027    /// Attempt to accept this incoming connection (an error may still occur)
1028    // AcceptError cannot be made smaller without semver breakage
1029    #[allow(clippy::result_large_err)]
1030    pub fn accept(
1031        &mut self,
1032        mut incoming: Incoming,
1033        now: Instant,
1034        buf: &mut Vec<u8>,
1035        server_config: Option<Arc<ServerConfig>>,
1036    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1037        let remote_address_validated = incoming.remote_address_validated();
1038        incoming.improper_drop_warner.dismiss();
1039        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1040        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1041
1042        let packet_number = incoming.packet.header.number.expand(0);
1043        let InitialHeader {
1044            src_cid,
1045            dst_cid,
1046            version,
1047            ..
1048        } = incoming.packet.header;
1049        let server_config =
1050            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1051
1052        if server_config
1053            .transport
1054            .max_idle_timeout
1055            .is_some_and(|timeout| {
1056                incoming.received_at + Duration::from_millis(timeout.into()) <= now
1057            })
1058        {
1059            debug!("abandoning accept of stale initial");
1060            self.index.remove_initial(dst_cid);
1061            return Err(AcceptError {
1062                cause: ConnectionError::TimedOut,
1063                response: None,
1064            });
1065        }
1066
1067        if self.cids_exhausted() {
1068            debug!("refusing connection");
1069            self.index.remove_initial(dst_cid);
1070            return Err(AcceptError {
1071                cause: ConnectionError::CidsExhausted,
1072                response: Some(self.initial_close(
1073                    version,
1074                    incoming.addresses,
1075                    &incoming.crypto,
1076                    &src_cid,
1077                    TransportError::CONNECTION_REFUSED(""),
1078                    buf,
1079                )),
1080            });
1081        }
1082
1083        if incoming
1084            .crypto
1085            .packet
1086            .remote
1087            .decrypt(
1088                packet_number,
1089                &incoming.packet.header_data,
1090                &mut incoming.packet.payload,
1091            )
1092            .is_err()
1093        {
1094            debug!(packet_number, "failed to authenticate initial packet");
1095            self.index.remove_initial(dst_cid);
1096            return Err(AcceptError {
1097                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1098                response: None,
1099            });
1100        };
1101
1102        let ch = ConnectionHandle(self.connections.vacant_key());
1103        let loc_cid = self.new_cid(ch);
1104        let mut params = TransportParameters::new(
1105            &server_config.transport,
1106            &self.config,
1107            self.local_cid_generator.as_ref(),
1108            loc_cid,
1109            Some(&server_config),
1110            &mut self.rng,
1111        );
1112        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1113        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1114        params.retry_src_cid = incoming.token.retry_src_cid;
1115        let mut pref_addr_cid = None;
1116        if server_config.has_preferred_address() {
1117            let cid = self.new_cid(ch);
1118            pref_addr_cid = Some(cid);
1119            params.preferred_address = Some(PreferredAddress {
1120                address_v4: server_config.preferred_address_v4,
1121                address_v6: server_config.preferred_address_v6,
1122                connection_id: cid,
1123                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1124            });
1125        }
1126
1127        let tls = server_config.crypto.clone().start_session(version, &params);
1128        let transport_config = server_config.transport.clone();
1129        let mut conn = self.add_connection(
1130            ch,
1131            version,
1132            dst_cid,
1133            loc_cid,
1134            src_cid,
1135            incoming.addresses,
1136            incoming.received_at,
1137            tls,
1138            transport_config,
1139            SideArgs::Server {
1140                server_config,
1141                pref_addr_cid,
1142                path_validated: remote_address_validated,
1143            },
1144        );
1145        self.index.insert_initial(dst_cid, ch);
1146
1147        match conn.handle_first_packet(
1148            incoming.received_at,
1149            incoming.addresses.remote,
1150            incoming.ecn,
1151            packet_number,
1152            incoming.packet,
1153            incoming.rest,
1154        ) {
1155            Ok(()) => {
1156                trace!(id = ch.0, icid = %dst_cid, "new connection");
1157
1158                for event in incoming_buffer.datagrams {
1159                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1160                }
1161
1162                Ok((ch, conn))
1163            }
1164            Err(e) => {
1165                debug!("handshake failed: {}", e);
1166                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1167                let response = match e {
1168                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
1169                        version,
1170                        incoming.addresses,
1171                        &incoming.crypto,
1172                        &src_cid,
1173                        e.clone(),
1174                        buf,
1175                    )),
1176                    _ => None,
1177                };
1178                Err(AcceptError { cause: e, response })
1179            }
1180        }
1181    }
1182
1183    /// Check if we should refuse a connection attempt regardless of the packet's contents
1184    fn early_validate_first_packet(
1185        &mut self,
1186        header: &ProtectedInitialHeader,
1187    ) -> Result<(), TransportError> {
1188        let config = &self.server_config.as_ref().unwrap();
1189        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1190            return Err(TransportError::CONNECTION_REFUSED(""));
1191        }
1192
1193        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
1194        // bytes. If this is a Retry packet, then the length must instead match our usual CID
1195        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
1196        // also need to validate CID length for those after decoding the token.
1197        if header.dst_cid.len() < 8
1198            && (header.token_pos.is_empty()
1199                || header.dst_cid.len() != self.local_cid_generator.cid_len())
1200        {
1201            debug!(
1202                "rejecting connection due to invalid DCID length {}",
1203                header.dst_cid.len()
1204            );
1205            return Err(TransportError::PROTOCOL_VIOLATION(
1206                "invalid destination CID length",
1207            ));
1208        }
1209
1210        Ok(())
1211    }
1212
1213    /// Reject this incoming connection attempt
1214    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1215        self.clean_up_incoming(&incoming);
1216        incoming.improper_drop_warner.dismiss();
1217
1218        self.initial_close(
1219            incoming.packet.header.version,
1220            incoming.addresses,
1221            &incoming.crypto,
1222            &incoming.packet.header.src_cid,
1223            TransportError::CONNECTION_REFUSED(""),
1224            buf,
1225        )
1226    }
1227
1228    /// Respond with a retry packet, requiring the client to retry with address validation
1229    ///
1230    /// Errors if `incoming.may_retry()` is false.
1231    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1232        if !incoming.may_retry() {
1233            return Err(RetryError(Box::new(incoming)));
1234        }
1235
1236        self.clean_up_incoming(&incoming);
1237        incoming.improper_drop_warner.dismiss();
1238
1239        let server_config = self.server_config.as_ref().unwrap();
1240
1241        // First Initial
1242        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
1243        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
1244        // with established connections. In the unlikely event that a collision occurs
1245        // between two connections in the initial phase, both will fail fast and may be
1246        // retried by the application layer.
1247        let loc_cid = self.local_cid_generator.generate_cid();
1248
1249        let payload = TokenPayload::Retry {
1250            address: incoming.addresses.remote,
1251            orig_dst_cid: incoming.packet.header.dst_cid,
1252            issued: server_config.time_source.now(),
1253        };
1254        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1255
1256        let header = Header::Retry {
1257            src_cid: loc_cid,
1258            dst_cid: incoming.packet.header.src_cid,
1259            version: incoming.packet.header.version,
1260        };
1261
1262        let encode = header.encode(buf);
1263        buf.put_slice(&token);
1264        buf.extend_from_slice(&server_config.crypto.retry_tag(
1265            incoming.packet.header.version,
1266            &incoming.packet.header.dst_cid,
1267            buf,
1268        ));
1269        encode.finish(buf, &*incoming.crypto.header.local, None);
1270
1271        Ok(Transmit {
1272            destination: incoming.addresses.remote,
1273            ecn: None,
1274            size: buf.len(),
1275            segment_size: None,
1276            src_ip: incoming.addresses.local_ip,
1277        })
1278    }
1279
1280    /// Ignore this incoming connection attempt, not sending any packet in response
1281    ///
1282    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
1283    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
1284    pub fn ignore(&mut self, incoming: Incoming) {
1285        self.clean_up_incoming(&incoming);
1286        incoming.improper_drop_warner.dismiss();
1287    }
1288
1289    /// Clean up endpoint data structures associated with an `Incoming`.
1290    fn clean_up_incoming(&mut self, incoming: &Incoming) {
1291        self.index.remove_initial(incoming.packet.header.dst_cid);
1292        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1293        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1294    }
1295
1296    fn add_connection(
1297        &mut self,
1298        ch: ConnectionHandle,
1299        version: u32,
1300        init_cid: ConnectionId,
1301        loc_cid: ConnectionId,
1302        rem_cid: ConnectionId,
1303        addresses: FourTuple,
1304        now: Instant,
1305        tls: Box<dyn crypto::Session>,
1306        transport_config: Arc<TransportConfig>,
1307        side_args: SideArgs,
1308    ) -> Connection {
1309        let mut rng_seed = [0; 32];
1310        self.rng.fill_bytes(&mut rng_seed);
1311        let side = side_args.side();
1312        let pref_addr_cid = side_args.pref_addr_cid();
1313        let conn = Connection::new(
1314            self.config.clone(),
1315            transport_config,
1316            init_cid,
1317            loc_cid,
1318            rem_cid,
1319            addresses.remote,
1320            addresses.local_ip,
1321            tls,
1322            self.local_cid_generator.as_ref(),
1323            now,
1324            version,
1325            self.allow_mtud,
1326            rng_seed,
1327            side_args,
1328        );
1329
1330        let mut cids_issued = 0;
1331        let mut loc_cids = FxHashMap::default();
1332
1333        loc_cids.insert(cids_issued, loc_cid);
1334        cids_issued += 1;
1335
1336        if let Some(cid) = pref_addr_cid {
1337            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1338            loc_cids.insert(cids_issued, cid);
1339            cids_issued += 1;
1340        }
1341
1342        let id = self.connections.insert(ConnectionMeta {
1343            init_cid,
1344            cids_issued,
1345            loc_cids,
1346            addresses,
1347            side,
1348            reset_token: None,
1349            peer_id: None,
1350        });
1351        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1352
1353        self.index.insert_conn(addresses, loc_cid, ch, side);
1354
1355        conn
1356    }
1357
1358    fn initial_close(
1359        &mut self,
1360        version: u32,
1361        addresses: FourTuple,
1362        crypto: &Keys,
1363        remote_id: &ConnectionId,
1364        reason: TransportError,
1365        buf: &mut Vec<u8>,
1366    ) -> Transmit {
1367        // We don't need to worry about CID collisions in initial closes because the peer
1368        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
1369        // unexpected response.
1370        let local_id = self.local_cid_generator.generate_cid();
1371        let number = PacketNumber::U8(0);
1372        let header = Header::Initial(InitialHeader {
1373            dst_cid: *remote_id,
1374            src_cid: local_id,
1375            number,
1376            token: Bytes::new(),
1377            version,
1378        });
1379
1380        let partial_encode = header.encode(buf);
1381        let max_len =
1382            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1383        frame::Close::from(reason).encode(buf, max_len);
1384        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1385        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1386        Transmit {
1387            destination: addresses.remote,
1388            ecn: None,
1389            size: buf.len(),
1390            segment_size: None,
1391            src_ip: addresses.local_ip,
1392        }
1393    }
1394
1395    /// Access the configuration used by this endpoint
1396    pub fn config(&self) -> &EndpointConfig {
1397        &self.config
1398    }
1399
1400    /// Enable or disable address discovery for this endpoint
1401    ///
1402    /// Address discovery is enabled by default. When enabled, the endpoint will:
1403    /// - Send OBSERVED_ADDRESS frames to peers to inform them of their reflexive addresses
1404    /// - Process received OBSERVED_ADDRESS frames to learn about its own reflexive addresses
1405    /// - Integrate discovered addresses with NAT traversal for improved connectivity
1406    pub fn enable_address_discovery(&mut self, enabled: bool) {
1407        self.address_discovery_enabled = enabled;
1408        // Note: Existing connections will continue with their current setting.
1409        // New connections will use the updated setting.
1410    }
1411
1412    /// Check if address discovery is enabled
1413    pub fn address_discovery_enabled(&self) -> bool {
1414        self.address_discovery_enabled
1415    }
1416
1417    /// Get all discovered addresses across all connections
1418    ///
1419    /// Returns a list of unique socket addresses that have been observed
1420    /// by remote peers and reported via OBSERVED_ADDRESS frames.
1421    ///
1422    /// Note: This returns an empty vector in the current implementation.
1423    /// Applications should track discovered addresses at the connection level.
1424    pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1425        // TODO: Implement address tracking at the endpoint level
1426        Vec::new()
1427    }
1428
1429    /// Set a callback to be invoked when an address change is detected
1430    ///
1431    /// The callback receives the old address (if any) and the new address.
1432    /// Only one callback can be set at a time; setting a new callback replaces the previous one.
1433    pub fn set_address_change_callback<F>(&mut self, callback: F)
1434    where
1435        F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1436    {
1437        self.address_change_callback = Some(Box::new(callback));
1438    }
1439
1440    /// Clear the address change callback
1441    pub fn clear_address_change_callback(&mut self) {
1442        self.address_change_callback = None;
1443    }
1444
1445    /// Get address discovery statistics
1446    ///
1447    /// Note: This returns default statistics in the current implementation.
1448    /// Applications should track statistics at the connection level.
1449    pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1450        // TODO: Implement statistics tracking at the endpoint level
1451        AddressDiscoveryStats::default()
1452    }
1453
1454    /// Number of connections that are currently open
1455    pub fn open_connections(&self) -> usize {
1456        self.connections.len()
1457    }
1458
1459    /// Counter for the number of bytes currently used
1460    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
1461    pub fn incoming_buffer_bytes(&self) -> u64 {
1462        self.all_incoming_buffers_total_bytes
1463    }
1464
1465    #[cfg(test)]
1466    pub(crate) fn known_connections(&self) -> usize {
1467        let x = self.connections.len();
1468        debug_assert_eq!(x, self.index.connection_ids_initial.len());
1469        // Not all connections have known reset tokens
1470        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1471        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
1472        debug_assert!(x >= self.index.incoming_connection_remotes.len());
1473        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1474        x
1475    }
1476
1477    #[cfg(test)]
1478    pub(crate) fn known_cids(&self) -> usize {
1479        self.index.connection_ids.len()
1480    }
1481
1482    /// Whether we've used up 3/4 of the available CID space
1483    ///
1484    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
1485    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
1486    fn cids_exhausted(&self) -> bool {
1487        self.local_cid_generator.cid_len() <= 4
1488            && self.local_cid_generator.cid_len() != 0
1489            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1490                - self.index.connection_ids.len())
1491                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1492    }
1493}
1494
1495impl fmt::Debug for Endpoint {
1496    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1497        fmt.debug_struct("Endpoint")
1498            .field("rng", &self.rng)
1499            .field("index", &self.index)
1500            .field("connections", &self.connections)
1501            .field("config", &self.config)
1502            .field("server_config", &self.server_config)
1503            // incoming_buffers too large
1504            .field("incoming_buffers.len", &self.incoming_buffers.len())
1505            .field(
1506                "all_incoming_buffers_total_bytes",
1507                &self.all_incoming_buffers_total_bytes,
1508            )
1509            .finish()
1510    }
1511}
1512
1513/// Buffered Initial and 0-RTT messages for a pending incoming connection
1514#[derive(Default)]
1515struct IncomingBuffer {
1516    datagrams: Vec<DatagramConnectionEvent>,
1517    total_bytes: u64,
1518}
1519
1520/// Part of protocol state incoming datagrams can be routed to
1521#[derive(Copy, Clone, Debug)]
1522enum RouteDatagramTo {
1523    Incoming(usize),
1524    Connection(ConnectionHandle),
1525}
1526
1527/// Maps packets to existing connections
1528#[derive(Default, Debug)]
1529struct ConnectionIndex {
1530    /// Identifies connections based on the initial DCID the peer utilized
1531    ///
1532    /// Uses a standard `HashMap` to protect against hash collision attacks.
1533    ///
1534    /// Used by the server, not the client.
1535    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1536    /// Identifies connections based on locally created CIDs
1537    ///
1538    /// Uses a cheaper hash function since keys are locally created
1539    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1540    /// Identifies incoming connections with zero-length CIDs
1541    ///
1542    /// Uses a standard `HashMap` to protect against hash collision attacks.
1543    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1544    /// Identifies outgoing connections with zero-length CIDs
1545    ///
1546    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1547    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1548    /// may be established per remote. We must omit the local address from the key because we don't
1549    /// necessarily know what address we're sending from, and hence receiving at.
1550    ///
1551    /// Uses a standard `HashMap` to protect against hash collision attacks.
1552    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1553    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1554    ///
1555    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1556    /// recipient, if any.
1557    connection_reset_tokens: ResetTokenTable,
1558}
1559
1560impl ConnectionIndex {
1561    /// Associate an incoming connection with its initial destination CID
1562    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1563        if dst_cid.is_empty() {
1564            return;
1565        }
1566        self.connection_ids_initial
1567            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1568    }
1569
1570    /// Remove an association with an initial destination CID
1571    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1572        if dst_cid.is_empty() {
1573            return;
1574        }
1575        let removed = self.connection_ids_initial.remove(&dst_cid);
1576        debug_assert!(removed.is_some());
1577    }
1578
1579    /// Associate a connection with its initial destination CID
1580    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1581        if dst_cid.is_empty() {
1582            return;
1583        }
1584        self.connection_ids_initial
1585            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1586    }
1587
1588    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1589    /// its current 4-tuple
1590    fn insert_conn(
1591        &mut self,
1592        addresses: FourTuple,
1593        dst_cid: ConnectionId,
1594        connection: ConnectionHandle,
1595        side: Side,
1596    ) {
1597        match dst_cid.len() {
1598            0 => match side {
1599                Side::Server => {
1600                    self.incoming_connection_remotes
1601                        .insert(addresses, connection);
1602                }
1603                Side::Client => {
1604                    self.outgoing_connection_remotes
1605                        .insert(addresses.remote, connection);
1606                }
1607            },
1608            _ => {
1609                self.connection_ids.insert(dst_cid, connection);
1610            }
1611        }
1612    }
1613
1614    /// Discard a connection ID
1615    fn retire(&mut self, dst_cid: ConnectionId) {
1616        self.connection_ids.remove(&dst_cid);
1617    }
1618
1619    /// Remove all references to a connection
1620    fn remove(&mut self, conn: &ConnectionMeta) {
1621        if conn.side.is_server() {
1622            self.remove_initial(conn.init_cid);
1623        }
1624        for cid in conn.loc_cids.values() {
1625            self.connection_ids.remove(cid);
1626        }
1627        self.incoming_connection_remotes.remove(&conn.addresses);
1628        self.outgoing_connection_remotes
1629            .remove(&conn.addresses.remote);
1630        if let Some((remote, token)) = conn.reset_token {
1631            self.connection_reset_tokens.remove(remote, token);
1632        }
1633    }
1634
1635    /// Find the existing connection that `datagram` should be routed to, if any
1636    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1637        let dst_cid = datagram.dst_cid();
1638        let is_empty_cid = dst_cid.is_empty();
1639
1640        // Fast path: Try most common lookup first (non-empty CID)
1641        if !is_empty_cid {
1642            if let Some(&ch) = self.connection_ids.get(dst_cid) {
1643                return Some(RouteDatagramTo::Connection(ch));
1644            }
1645        }
1646
1647        // Initial/0RTT packet lookup
1648        if datagram.is_initial() || datagram.is_0rtt() {
1649            if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1650                return Some(ch);
1651            }
1652        }
1653
1654        // Empty CID lookup (less common, do after fast path)
1655        if is_empty_cid {
1656            // Check incoming connections first (servers handle more incoming)
1657            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1658                return Some(RouteDatagramTo::Connection(ch));
1659            }
1660            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1661                return Some(RouteDatagramTo::Connection(ch));
1662            }
1663        }
1664
1665        // Stateless reset token lookup (least common, do last)
1666        let data = datagram.data();
1667        if data.len() < RESET_TOKEN_SIZE {
1668            return None;
1669        }
1670        self.connection_reset_tokens
1671            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1672            .cloned()
1673            .map(RouteDatagramTo::Connection)
1674    }
1675}
1676
1677#[derive(Debug)]
1678pub(crate) struct ConnectionMeta {
1679    init_cid: ConnectionId,
1680    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1681    cids_issued: u64,
1682    loc_cids: FxHashMap<u64, ConnectionId>,
1683    /// Remote/local addresses the connection began with
1684    ///
1685    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1686    /// bother keeping it up to date.
1687    addresses: FourTuple,
1688    side: Side,
1689    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1690    /// being sent to
1691    reset_token: Option<(SocketAddr, ResetToken)>,
1692    /// Peer ID for this connection, used for relay functionality
1693    peer_id: Option<PeerId>,
1694}
1695
1696/// Internal identifier for a `Connection` currently associated with an endpoint
1697#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1698pub struct ConnectionHandle(pub usize);
1699
1700impl From<ConnectionHandle> for usize {
1701    fn from(x: ConnectionHandle) -> Self {
1702        x.0
1703    }
1704}
1705
1706impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1707    type Output = ConnectionMeta;
1708    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1709        &self[ch.0]
1710    }
1711}
1712
1713impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1714    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1715        &mut self[ch.0]
1716    }
1717}
1718
1719/// Event resulting from processing a single datagram
1720pub enum DatagramEvent {
1721    /// The datagram is redirected to its `Connection`
1722    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1723    /// The datagram may result in starting a new `Connection`
1724    NewConnection(Incoming),
1725    /// Response generated directly by the endpoint
1726    Response(Transmit),
1727}
1728
1729/// An incoming connection for which the server has not yet begun its part of the handshake.
1730pub struct Incoming {
1731    received_at: Instant,
1732    addresses: FourTuple,
1733    ecn: Option<EcnCodepoint>,
1734    packet: InitialPacket,
1735    rest: Option<BytesMut>,
1736    crypto: Keys,
1737    token: IncomingToken,
1738    incoming_idx: usize,
1739    improper_drop_warner: IncomingImproperDropWarner,
1740}
1741
1742impl Incoming {
1743    /// The local IP address which was used when the peer established the connection
1744    ///
1745    /// This has the same behavior as [`Connection::local_ip`].
1746    pub fn local_ip(&self) -> Option<IpAddr> {
1747        self.addresses.local_ip
1748    }
1749
1750    /// The peer's UDP address
1751    pub fn remote_address(&self) -> SocketAddr {
1752        self.addresses.remote
1753    }
1754
1755    /// Whether the socket address that is initiating this connection has been validated
1756    ///
1757    /// This means that the sender of the initial packet has proved that they can receive traffic
1758    /// sent to `self.remote_address()`.
1759    ///
1760    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1761    /// The inverse is not guaranteed.
1762    pub fn remote_address_validated(&self) -> bool {
1763        self.token.validated
1764    }
1765
1766    /// Whether it is legal to respond with a retry packet
1767    ///
1768    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1769    /// The inverse is not guaranteed.
1770    pub fn may_retry(&self) -> bool {
1771        self.token.retry_src_cid.is_none()
1772    }
1773
1774    /// The original destination connection ID sent by the client
1775    pub fn orig_dst_cid(&self) -> &ConnectionId {
1776        &self.token.orig_dst_cid
1777    }
1778}
1779
1780impl fmt::Debug for Incoming {
1781    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1782        f.debug_struct("Incoming")
1783            .field("addresses", &self.addresses)
1784            .field("ecn", &self.ecn)
1785            // packet doesn't implement debug
1786            // rest is too big and not meaningful enough
1787            .field("token", &self.token)
1788            .field("incoming_idx", &self.incoming_idx)
1789            // improper drop warner contains no information
1790            .finish_non_exhaustive()
1791    }
1792}
1793
1794struct IncomingImproperDropWarner;
1795
1796impl IncomingImproperDropWarner {
1797    fn dismiss(self) {
1798        mem::forget(self);
1799    }
1800}
1801
1802impl Drop for IncomingImproperDropWarner {
1803    fn drop(&mut self) {
1804        warn!(
1805            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1806               (may cause memory leak and eventual inability to accept new connections)"
1807        );
1808    }
1809}
1810
1811/// Errors in the parameters being used to create a new connection
1812///
1813/// These arise before any I/O has been performed.
1814#[derive(Debug, Error, Clone, PartialEq, Eq)]
1815pub enum ConnectError {
1816    /// The endpoint can no longer create new connections
1817    ///
1818    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1819    #[error("endpoint stopping")]
1820    EndpointStopping,
1821    /// The connection could not be created because not enough of the CID space is available
1822    ///
1823    /// Try using longer connection IDs
1824    #[error("CIDs exhausted")]
1825    CidsExhausted,
1826    /// The given server name was malformed
1827    #[error("invalid server name: {0}")]
1828    InvalidServerName(String),
1829    /// The remote [`SocketAddr`] supplied was malformed
1830    ///
1831    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1832    #[error("invalid remote address: {0}")]
1833    InvalidRemoteAddress(SocketAddr),
1834    /// No default client configuration was set up
1835    ///
1836    /// Use `Endpoint::connect_with` to specify a client configuration.
1837    #[error("no default client config")]
1838    NoDefaultClientConfig,
1839    /// The local endpoint does not support the QUIC version specified in the client configuration
1840    #[error("unsupported QUIC version")]
1841    UnsupportedVersion,
1842}
1843
1844/// Error type for attempting to accept an [`Incoming`]
1845#[derive(Debug)]
1846pub struct AcceptError {
1847    /// Underlying error describing reason for failure
1848    pub cause: ConnectionError,
1849    /// Optional response to transmit back
1850    pub response: Option<Transmit>,
1851}
1852
1853/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1854#[derive(Debug, Error)]
1855#[error("retry() with validated Incoming")]
1856pub struct RetryError(Box<Incoming>);
1857
1858impl RetryError {
1859    /// Get the [`Incoming`]
1860    pub fn into_incoming(self) -> Incoming {
1861        *self.0
1862    }
1863}
1864
1865/// Reset Tokens which are associated with peer socket addresses
1866///
1867/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1868/// peer generated and might be usable for hash collision attacks.
1869#[derive(Default, Debug)]
1870struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1871
1872impl ResetTokenTable {
1873    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1874        self.0
1875            .entry(remote)
1876            .or_default()
1877            .insert(token, ch)
1878            .is_some()
1879    }
1880
1881    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1882        use std::collections::hash_map::Entry;
1883        match self.0.entry(remote) {
1884            Entry::Vacant(_) => {}
1885            Entry::Occupied(mut e) => {
1886                e.get_mut().remove(&token);
1887                if e.get().is_empty() {
1888                    e.remove_entry();
1889                }
1890            }
1891        }
1892    }
1893
1894    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1895        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1896        self.0.get(&remote)?.get(&token)
1897    }
1898}
1899
1900/// Identifies a connection by the combination of remote and local addresses
1901///
1902/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1903/// subnet and zero-length connection IDs are in use.
1904#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1905struct FourTuple {
1906    remote: SocketAddr,
1907    // A single socket can only listen on a single port, so no need to store it explicitly
1908    local_ip: Option<IpAddr>,
1909}