ant_quic/
endpoint.rs

1use std::{
2    collections::{HashMap, VecDeque, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21    Side, Transmit, TransportConfig, TransportError,
22    cid_generator::ConnectionIdGenerator,
23    coding::BufMutExt,
24    config::{ClientConfig, EndpointConfig, ServerConfig},
25    connection::{Connection, ConnectionError, SideArgs},
26    crypto::{self, Keys, UnsupportedVersion},
27    frame,
28    nat_traversal_api::PeerId,
29    packet::{
30        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31        PacketNumber, PartialDecode, ProtectedInitialHeader,
32    },
33    shared::{
34        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35        EndpointEvent, EndpointEventInner, IssuedCid,
36    },
37    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38    transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41/// A queued relay request for bootstrap nodes
42#[derive(Debug, Clone)]
43struct RelayQueueItem {
44    /// Target peer ID for the relay
45    target_peer_id: PeerId,
46    /// Frame to be relayed
47    frame: frame::PunchMeNow,
48    /// When this relay request was created
49    created_at: Instant,
50    /// Number of relay attempts made
51    attempts: u32,
52    /// Last attempt time
53    last_attempt: Option<Instant>,
54}
55
56/// Relay queue management for bootstrap nodes
57#[derive(Debug)]
58struct RelayQueue {
59    /// Pending relay requests with insertion order and O(1) access
60    pending: IndexMap<u64, RelayQueueItem>,
61    /// Next sequence number for insertion order
62    next_seq: u64,
63    /// Maximum queue size to prevent memory exhaustion
64    max_queue_size: usize,
65    /// Timeout for relay requests
66    request_timeout: Duration,
67    /// Maximum retry attempts per request
68    max_retries: u32,
69    /// Minimum interval between retry attempts
70    retry_interval: Duration,
71    /// Rate limiting: track recent relay requests per peer
72    rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73    /// Maximum relays per peer per time window
74    max_relays_per_peer: usize,
75    /// Rate limiting time window
76    rate_limit_window: Duration,
77}
78
79/// Address discovery statistics
80#[derive(Debug, Default, Clone)]
81pub struct AddressDiscoveryStats {
82    /// Number of OBSERVED_ADDRESS frames sent
83    pub frames_sent: u64,
84    /// Number of OBSERVED_ADDRESS frames received
85    pub frames_received: u64,
86    /// Number of unique addresses discovered
87    pub addresses_discovered: u64,
88    /// Number of address changes detected
89    pub address_changes_detected: u64,
90}
91
92/// Relay statistics for monitoring and debugging
93#[derive(Debug, Default)]
94pub struct RelayStats {
95    /// Total relay requests received
96    requests_received: u64,
97    /// Successfully relayed requests
98    requests_relayed: u64,
99    /// Failed relay requests (peer not found)
100    requests_failed: u64,
101    /// Requests dropped due to queue full
102    requests_dropped: u64,
103    /// Requests timed out
104    requests_timed_out: u64,
105    /// Requests dropped due to rate limiting
106    requests_rate_limited: u64,
107    /// Current queue size
108    current_queue_size: usize,
109}
110
111impl RelayQueue {
112    /// Create a new relay queue with default settings
113    fn new() -> Self {
114        Self {
115            pending: IndexMap::new(),
116            next_seq: 0,
117            max_queue_size: 1000,                     // Reasonable default
118            request_timeout: Duration::from_secs(30), // 30 second timeout
119            max_retries: 3,
120            retry_interval: Duration::from_millis(500), // 500ms between retries
121            rate_limiter: HashMap::new(),
122            max_relays_per_peer: 10, // Max 10 relays per peer per time window
123            rate_limit_window: Duration::from_secs(60), // 1 minute window
124        }
125    }
126
127    /// Add a relay request to the queue
128    fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
129        // Check queue size limit
130        if self.pending.len() >= self.max_queue_size {
131            warn!(
132                "Relay queue full, dropping request for peer {:?}",
133                target_peer_id
134            );
135            return false;
136        }
137
138        // Check rate limit for this peer
139        if !self.check_rate_limit(target_peer_id, now) {
140            warn!(
141                "Rate limit exceeded for peer {:?}, dropping relay request",
142                target_peer_id
143            );
144            return false;
145        }
146
147        let item = RelayQueueItem {
148            target_peer_id,
149            frame,
150            created_at: now,
151            attempts: 0,
152            last_attempt: None,
153        };
154
155        let seq = self.next_seq;
156        self.next_seq += 1;
157        self.pending.insert(seq, item);
158
159        // Record this request for rate limiting
160        self.record_relay_request(target_peer_id, now);
161
162        trace!(
163            "Queued relay request for peer {:?}, queue size: {}",
164            target_peer_id,
165            self.pending.len()
166        );
167        true
168    }
169
170    /// Check if a relay request is within rate limits
171    fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
172        // Clean up old entries first
173        self.cleanup_rate_limiter(now);
174
175        // Check current request count for this peer
176        if let Some(requests) = self.rate_limiter.get(&peer_id) {
177            requests.len() < self.max_relays_per_peer
178        } else {
179            true // No previous requests, allow
180        }
181    }
182
183    /// Record a relay request for rate limiting
184    fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
185        self.rate_limiter
186            .entry(peer_id)
187            .or_insert_with(VecDeque::new)
188            .push_back(now);
189    }
190
191    /// Clean up old rate limiting entries
192    fn cleanup_rate_limiter(&mut self, now: Instant) {
193        self.rate_limiter.retain(|_, requests| {
194            requests.retain(|&request_time| {
195                now.saturating_duration_since(request_time) <= self.rate_limit_window
196            });
197            !requests.is_empty()
198        });
199    }
200
201    /// Get the next relay request that's ready to be processed
202    fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
203        // Find the first request that's ready to be retried
204        let mut expired_keys = Vec::new();
205        let mut ready_key = None;
206
207        for (seq, item) in &self.pending {
208            // Check if request has timed out
209            if now.saturating_duration_since(item.created_at) > self.request_timeout {
210                expired_keys.push(*seq);
211                continue;
212            }
213
214            // Check if it's ready for retry
215            if item.attempts == 0
216                || item.last_attempt.map_or(true, |last| {
217                    now.saturating_duration_since(last) >= self.retry_interval
218                })
219            {
220                ready_key = Some(*seq);
221                break;
222            }
223        }
224
225        // Remove expired items
226        for key in expired_keys {
227            if let Some(expired) = self.pending.shift_remove(&key) {
228                debug!(
229                    "Relay request for peer {:?} timed out after {:?}",
230                    expired.target_peer_id,
231                    now.saturating_duration_since(expired.created_at)
232                );
233            }
234        }
235
236        // Return ready item if found
237        if let Some(key) = ready_key {
238            if let Some(mut item) = self.pending.shift_remove(&key) {
239                item.attempts += 1;
240                item.last_attempt = Some(now);
241                return Some(item);
242            }
243        }
244
245        None
246    }
247
248    /// Requeue a failed relay request if it hasn't exceeded max retries
249    fn requeue_failed(&mut self, item: RelayQueueItem) {
250        if item.attempts < self.max_retries {
251            trace!(
252                "Requeuing failed relay request for peer {:?}, attempt {}/{}",
253                item.target_peer_id, item.attempts, self.max_retries
254            );
255            let seq = self.next_seq;
256            self.next_seq += 1;
257            self.pending.insert(seq, item);
258        } else {
259            debug!(
260                "Dropping relay request for peer {:?} after {} failed attempts",
261                item.target_peer_id, item.attempts
262            );
263        }
264    }
265
266    /// Clean up expired requests and return number of items cleaned
267    fn cleanup_expired(&mut self, now: Instant) -> usize {
268        let initial_len = self.pending.len();
269
270        // Collect expired keys
271        let expired_keys: Vec<u64> = self
272            .pending
273            .iter()
274            .filter_map(|(seq, item)| {
275                if now.saturating_duration_since(item.created_at) > self.request_timeout {
276                    Some(*seq)
277                } else {
278                    None
279                }
280            })
281            .collect();
282
283        // Remove expired items
284        for key in expired_keys {
285            if let Some(expired) = self.pending.shift_remove(&key) {
286                debug!(
287                    "Removing expired relay request for peer {:?}",
288                    expired.target_peer_id
289                );
290            }
291        }
292
293        initial_len - self.pending.len()
294    }
295
296    /// Get current queue length
297    fn len(&self) -> usize {
298        self.pending.len()
299    }
300}
301
302/// The main entry point to the library
303///
304/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
305/// connection-generated events via `handle` and `handle_event`.
306pub struct Endpoint {
307    rng: StdRng,
308    index: ConnectionIndex,
309    connections: Slab<ConnectionMeta>,
310    local_cid_generator: Box<dyn ConnectionIdGenerator>,
311    config: Arc<EndpointConfig>,
312    server_config: Option<Arc<ServerConfig>>,
313    /// Whether the underlying UDP socket promises not to fragment packets
314    allow_mtud: bool,
315    /// Time at which a stateless reset was most recently sent
316    last_stateless_reset: Option<Instant>,
317    /// Buffered Initial and 0-RTT messages for pending incoming connections
318    incoming_buffers: Slab<IncomingBuffer>,
319    all_incoming_buffers_total_bytes: u64,
320    /// Mapping from peer IDs to connection handles for relay functionality
321    peer_connections: HashMap<PeerId, ConnectionHandle>,
322    /// Relay queue for bootstrap nodes
323    relay_queue: RelayQueue,
324    /// Relay statistics
325    relay_stats: RelayStats,
326    /// Whether address discovery is enabled (default: true)
327    address_discovery_enabled: bool,
328    /// Address change callback
329    address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
330}
331
332impl Endpoint {
333    /// Create a new endpoint
334    ///
335    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
336    /// better performance. This requires that outgoing packets are never fragmented, which can be
337    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
338    ///
339    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
340    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
341    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
342    /// using [`EndpointConfig::rng_seed`].
343    pub fn new(
344        config: Arc<EndpointConfig>,
345        server_config: Option<Arc<ServerConfig>>,
346        allow_mtud: bool,
347        rng_seed: Option<[u8; 32]>,
348    ) -> Self {
349        let rng_seed = rng_seed.or(config.rng_seed);
350        Self {
351            rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
352            index: ConnectionIndex::default(),
353            connections: Slab::new(),
354            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
355            config,
356            server_config,
357            allow_mtud,
358            last_stateless_reset: None,
359            incoming_buffers: Slab::new(),
360            all_incoming_buffers_total_bytes: 0,
361            peer_connections: HashMap::new(),
362            relay_queue: RelayQueue::new(),
363            relay_stats: RelayStats::default(),
364            address_discovery_enabled: true, // Default to enabled
365            address_change_callback: None,
366        }
367    }
368
369    /// Replace the server configuration, affecting new incoming connections only
370    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
371        self.server_config = server_config;
372    }
373
374    /// Register a peer ID with a connection handle for relay functionality
375    pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
376        self.peer_connections.insert(peer_id, connection_handle);
377        trace!(
378            "Registered peer {:?} with connection {:?}",
379            peer_id, connection_handle
380        );
381    }
382
383    /// Unregister a peer ID from the connection mapping
384    pub fn unregister_peer(&mut self, peer_id: &PeerId) {
385        if let Some(handle) = self.peer_connections.remove(peer_id) {
386            trace!(
387                "Unregistered peer {:?} from connection {:?}",
388                peer_id, handle
389            );
390        }
391    }
392
393    /// Look up a connection handle for a given peer ID
394    pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
395        self.peer_connections.get(peer_id).copied()
396    }
397
398    /// Queue a frame for relay to a target peer
399    pub(crate) fn queue_frame_for_peer(
400        &mut self,
401        peer_id: &PeerId,
402        frame: frame::PunchMeNow,
403    ) -> bool {
404        self.relay_stats.requests_received += 1;
405
406        if let Some(ch) = self.lookup_peer_connection(peer_id) {
407            // Peer is currently connected, try to relay immediately
408            if self.relay_frame_to_connection(ch, frame.clone()) {
409                self.relay_stats.requests_relayed += 1;
410                trace!(
411                    "Immediately relayed frame to peer {:?} via connection {:?}",
412                    peer_id, ch
413                );
414                return true;
415            }
416        }
417
418        // Peer not connected or immediate relay failed, queue for later
419        let now = Instant::now();
420        if self.relay_queue.enqueue(*peer_id, frame, now) {
421            self.relay_stats.current_queue_size = self.relay_queue.len();
422            trace!("Queued relay request for peer {:?}", peer_id);
423            true
424        } else {
425            // Check if it was rate limited or queue full
426            if !self.relay_queue.check_rate_limit(*peer_id, now) {
427                self.relay_stats.requests_rate_limited += 1;
428            } else {
429                self.relay_stats.requests_dropped += 1;
430            }
431            false
432        }
433    }
434
435    /// Attempt to relay a frame to a specific connection
436    fn relay_frame_to_connection(
437        &mut self,
438        ch: ConnectionHandle,
439        _frame: frame::PunchMeNow,
440    ) -> bool {
441        // In a complete implementation, this would queue the frame in the connection's
442        // pending frames. For now, we'll just return true to indicate success.
443        // The actual frame queuing would need to be implemented at the connection level.
444
445        // TODO: Implement actual frame queuing to connection's pending frames
446        trace!("Would relay frame to connection {:?}", ch);
447        true
448    }
449
450    /// Set the peer ID for an existing connection
451    pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
452        if let Some(connection) = self.connections.get_mut(connection_handle.0) {
453            connection.peer_id = Some(peer_id);
454            self.register_peer(peer_id, connection_handle);
455
456            // Process any queued relay requests for this peer
457            self.process_queued_relays_for_peer(peer_id);
458        }
459    }
460
461    /// Process queued relay requests for a specific peer that just connected
462    fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
463        let _now = Instant::now();
464        let mut processed = 0;
465
466        // Collect items to process for this peer
467        let mut items_to_process = Vec::new();
468        let mut keys_to_remove = Vec::new();
469
470        // Find all items for this peer
471        for (seq, item) in &self.relay_queue.pending {
472            if item.target_peer_id == peer_id {
473                items_to_process.push(item.clone());
474                keys_to_remove.push(*seq);
475            }
476        }
477
478        // Remove items from queue
479        for key in keys_to_remove {
480            self.relay_queue.pending.shift_remove(&key);
481        }
482
483        // Process the items
484        for item in items_to_process {
485            if let Some(ch) = self.lookup_peer_connection(&peer_id) {
486                if self.relay_frame_to_connection(ch, item.frame.clone()) {
487                    self.relay_stats.requests_relayed += 1;
488                    processed += 1;
489                    trace!("Processed queued relay for peer {:?}", peer_id);
490                } else {
491                    // Failed to relay, requeue
492                    self.relay_queue.requeue_failed(item);
493                    self.relay_stats.requests_failed += 1;
494                }
495            }
496        }
497
498        self.relay_stats.current_queue_size = self.relay_queue.len();
499
500        if processed > 0 {
501            debug!(
502                "Processed {} queued relay requests for peer {:?}",
503                processed, peer_id
504            );
505        }
506    }
507
508    /// Process pending relay requests (should be called periodically)
509    pub fn process_relay_queue(&mut self) {
510        let now = Instant::now();
511        let mut processed = 0;
512        let mut failed = 0;
513
514        // Process ready relay requests
515        while let Some(item) = self.relay_queue.next_ready(now) {
516            if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
517                if self.relay_frame_to_connection(ch, item.frame.clone()) {
518                    self.relay_stats.requests_relayed += 1;
519                    processed += 1;
520                    trace!(
521                        "Successfully relayed frame to peer {:?}",
522                        item.target_peer_id
523                    );
524                } else {
525                    // Failed to relay, requeue for retry
526                    self.relay_queue.requeue_failed(item);
527                    self.relay_stats.requests_failed += 1;
528                    failed += 1;
529                }
530            } else {
531                // Peer not connected, requeue for later
532                self.relay_queue.requeue_failed(item);
533                failed += 1;
534            }
535        }
536
537        // Clean up expired requests
538        let expired = self.relay_queue.cleanup_expired(now);
539        if expired > 0 {
540            self.relay_stats.requests_timed_out += expired as u64;
541            debug!("Cleaned up {} expired relay requests", expired);
542        }
543
544        self.relay_stats.current_queue_size = self.relay_queue.len();
545
546        if processed > 0 || failed > 0 {
547            trace!(
548                "Relay queue processing: {} processed, {} failed, {} in queue",
549                processed,
550                failed,
551                self.relay_queue.len()
552            );
553        }
554    }
555
556    /// Get relay statistics for monitoring
557    pub fn relay_stats(&self) -> &RelayStats {
558        &self.relay_stats
559    }
560
561    /// Get relay queue length
562    pub fn relay_queue_len(&self) -> usize {
563        self.relay_queue.len()
564    }
565
566    /// Process `EndpointEvent`s emitted from related `Connection`s
567    ///
568    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
569    pub fn handle_event(
570        &mut self,
571        ch: ConnectionHandle,
572        event: EndpointEvent,
573    ) -> Option<ConnectionEvent> {
574        use EndpointEventInner::*;
575        match event.0 {
576            EndpointEventInner::NeedIdentifiers(now, n) => {
577                return Some(self.send_new_identifiers(now, ch, n));
578            }
579            ResetToken(remote, token) => {
580                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
581                    self.index.connection_reset_tokens.remove(old.0, old.1);
582                }
583                if self.index.connection_reset_tokens.insert(remote, token, ch) {
584                    warn!("duplicate reset token");
585                }
586            }
587            RetireConnectionId(now, seq, allow_more_cids) => {
588                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
589                    trace!("peer retired CID {}: {}", seq, cid);
590                    self.index.retire(cid);
591                    if allow_more_cids {
592                        return Some(self.send_new_identifiers(now, ch, 1));
593                    }
594                }
595            }
596            RelayPunchMeNow(target_peer_id, punch_me_now) => {
597                // Handle relay request from bootstrap node
598                let peer_id = PeerId(target_peer_id);
599                if self.queue_frame_for_peer(&peer_id, punch_me_now) {
600                    trace!(
601                        "Successfully queued PunchMeNow frame for relay to peer {:?}",
602                        peer_id
603                    );
604                } else {
605                    warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
606                }
607            }
608            SendAddressFrame(add_address_frame) => {
609                // Handle bootstrap node request to send ADD_ADDRESS frame
610                trace!(
611                    "Sending ADD_ADDRESS frame: seq={}, addr={}, priority={}",
612                    add_address_frame.sequence,
613                    add_address_frame.address,
614                    add_address_frame.priority
615                );
616
617                // For now, log the frame since the queuing mechanism needs more integration
618                // TODO: Implement proper frame queuing in the connection layer
619                debug!(
620                    "ADD_ADDRESS frame ready for transmission: {:?}",
621                    add_address_frame
622                );
623            }
624            NatCandidateValidated { address, challenge } => {
625                // Handle successful NAT traversal candidate validation
626                trace!(
627                    "NAT candidate validation succeeded for {} with challenge {:016x}",
628                    address, challenge
629                );
630
631                // The validation success is primarily handled by the connection-level state machine
632                // This event serves as notification to the endpoint for potential coordination
633                // with other components or logging/metrics collection
634                debug!("NAT candidate {} validated successfully", address);
635            }
636            Drained => {
637                if let Some(conn) = self.connections.try_remove(ch.0) {
638                    self.index.remove(&conn);
639                    // Clean up peer connection mapping if this connection has a peer ID
640                    if let Some(peer_id) = conn.peer_id {
641                        self.peer_connections.remove(&peer_id);
642                        trace!("Cleaned up peer connection mapping for {:?}", peer_id);
643                    }
644                } else {
645                    // This indicates a bug in downstream code, which could cause spurious
646                    // connection loss instead of this error if the CID was (re)allocated prior to
647                    // the illegal call.
648                    error!(id = ch.0, "unknown connection drained");
649                }
650            }
651        }
652        None
653    }
654
655    /// Process an incoming UDP datagram
656    pub fn handle(
657        &mut self,
658        now: Instant,
659        remote: SocketAddr,
660        local_ip: Option<IpAddr>,
661        ecn: Option<EcnCodepoint>,
662        data: BytesMut,
663        buf: &mut Vec<u8>,
664    ) -> Option<DatagramEvent> {
665        // Partially decode packet or short-circuit if unable
666        let datagram_len = data.len();
667        let event = match PartialDecode::new(
668            data,
669            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
670            &self.config.supported_versions,
671            self.config.grease_quic_bit,
672        ) {
673            Ok((first_decode, remaining)) => DatagramConnectionEvent {
674                now,
675                remote,
676                ecn,
677                first_decode,
678                remaining,
679            },
680            Err(PacketDecodeError::UnsupportedVersion {
681                src_cid,
682                dst_cid,
683                version,
684            }) => {
685                if self.server_config.is_none() {
686                    debug!("dropping packet with unsupported version");
687                    return None;
688                }
689                trace!("sending version negotiation");
690                // Negotiate versions
691                Header::VersionNegotiate {
692                    random: self.rng.gen::<u8>() | 0x40,
693                    src_cid: dst_cid,
694                    dst_cid: src_cid,
695                }
696                .encode(buf);
697                // Grease with a reserved version
698                buf.write::<u32>(match version {
699                    0x0a1a_2a3a => 0x0a1a_2a4a,
700                    _ => 0x0a1a_2a3a,
701                });
702                for &version in &self.config.supported_versions {
703                    buf.write(version);
704                }
705                return Some(DatagramEvent::Response(Transmit {
706                    destination: remote,
707                    ecn: None,
708                    size: buf.len(),
709                    segment_size: None,
710                    src_ip: local_ip,
711                }));
712            }
713            Err(e) => {
714                trace!("malformed header: {}", e);
715                return None;
716            }
717        };
718
719        let addresses = FourTuple { remote, local_ip };
720        let dst_cid = event.first_decode.dst_cid();
721
722        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
723            // Handle packet on existing connection
724            match route_to {
725                RouteDatagramTo::Incoming(incoming_idx) => {
726                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
727                    let config = &self.server_config.as_ref().unwrap();
728
729                    if incoming_buffer
730                        .total_bytes
731                        .checked_add(datagram_len as u64)
732                        .is_some_and(|n| n <= config.incoming_buffer_size)
733                        && self
734                            .all_incoming_buffers_total_bytes
735                            .checked_add(datagram_len as u64)
736                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
737                    {
738                        incoming_buffer.datagrams.push(event);
739                        incoming_buffer.total_bytes += datagram_len as u64;
740                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
741                    }
742
743                    None
744                }
745                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
746                    ch,
747                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
748                )),
749            }
750        } else if event.first_decode.initial_header().is_some() {
751            // Potentially create a new connection
752
753            self.handle_first_packet(datagram_len, event, addresses, buf)
754        } else if event.first_decode.has_long_header() {
755            debug!(
756                "ignoring non-initial packet for unknown connection {}",
757                dst_cid
758            );
759            None
760        } else if !event.first_decode.is_initial()
761            && self.local_cid_generator.validate(dst_cid).is_err()
762        {
763            // If we got this far, we're receiving a seemingly valid packet for an unknown
764            // connection. Send a stateless reset if possible.
765
766            debug!("dropping packet with invalid CID");
767            None
768        } else if dst_cid.is_empty() {
769            trace!("dropping unrecognized short packet without ID");
770            None
771        } else {
772            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
773                .map(DatagramEvent::Response)
774        }
775    }
776
777    fn stateless_reset(
778        &mut self,
779        now: Instant,
780        inciting_dgram_len: usize,
781        addresses: FourTuple,
782        dst_cid: ConnectionId,
783        buf: &mut Vec<u8>,
784    ) -> Option<Transmit> {
785        if self
786            .last_stateless_reset
787            .is_some_and(|last| last + self.config.min_reset_interval > now)
788        {
789            debug!("ignoring unexpected packet within minimum stateless reset interval");
790            return None;
791        }
792
793        /// Minimum amount of padding for the stateless reset to look like a short-header packet
794        const MIN_PADDING_LEN: usize = 5;
795
796        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
797        // smaller than the inciting packet.
798        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
799            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
800            _ => {
801                debug!(
802                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
803                    inciting_dgram_len
804                );
805                return None;
806            }
807        };
808
809        debug!(
810            "sending stateless reset for {} to {}",
811            dst_cid, addresses.remote
812        );
813        self.last_stateless_reset = Some(now);
814        // Resets with at least this much padding can't possibly be distinguished from real packets
815        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
816        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
817            max_padding_len
818        } else {
819            self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
820        };
821        buf.reserve(padding_len + RESET_TOKEN_SIZE);
822        buf.resize(padding_len, 0);
823        self.rng.fill_bytes(&mut buf[0..padding_len]);
824        buf[0] = 0b0100_0000 | (buf[0] >> 2);
825        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
826
827        debug_assert!(buf.len() < inciting_dgram_len);
828
829        Some(Transmit {
830            destination: addresses.remote,
831            ecn: None,
832            size: buf.len(),
833            segment_size: None,
834            src_ip: addresses.local_ip,
835        })
836    }
837
838    /// Initiate a connection
839    pub fn connect(
840        &mut self,
841        now: Instant,
842        config: ClientConfig,
843        remote: SocketAddr,
844        server_name: &str,
845    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
846        if self.cids_exhausted() {
847            return Err(ConnectError::CidsExhausted);
848        }
849        if remote.port() == 0 || remote.ip().is_unspecified() {
850            return Err(ConnectError::InvalidRemoteAddress(remote));
851        }
852        if !self.config.supported_versions.contains(&config.version) {
853            return Err(ConnectError::UnsupportedVersion);
854        }
855
856        let remote_id = (config.initial_dst_cid_provider)();
857        trace!(initial_dcid = %remote_id);
858
859        let ch = ConnectionHandle(self.connections.vacant_key());
860        let loc_cid = self.new_cid(ch);
861        let params = TransportParameters::new(
862            &config.transport,
863            &self.config,
864            self.local_cid_generator.as_ref(),
865            loc_cid,
866            None,
867            &mut self.rng,
868        );
869        let tls = config
870            .crypto
871            .start_session(config.version, server_name, &params)?;
872
873        let conn = self.add_connection(
874            ch,
875            config.version,
876            remote_id,
877            loc_cid,
878            remote_id,
879            FourTuple {
880                remote,
881                local_ip: None,
882            },
883            now,
884            tls,
885            config.transport,
886            SideArgs::Client {
887                token_store: config.token_store,
888                server_name: server_name.into(),
889            },
890        );
891        Ok((ch, conn))
892    }
893
894    fn send_new_identifiers(
895        &mut self,
896        now: Instant,
897        ch: ConnectionHandle,
898        num: u64,
899    ) -> ConnectionEvent {
900        let mut ids = vec![];
901        for _ in 0..num {
902            let id = self.new_cid(ch);
903            let meta = &mut self.connections[ch];
904            let sequence = meta.cids_issued;
905            meta.cids_issued += 1;
906            meta.loc_cids.insert(sequence, id);
907            ids.push(IssuedCid {
908                sequence,
909                id,
910                reset_token: ResetToken::new(&*self.config.reset_key, id),
911            });
912        }
913        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
914    }
915
916    /// Generate a connection ID for `ch`
917    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
918        loop {
919            let cid = self.local_cid_generator.generate_cid();
920            if cid.is_empty() {
921                // Zero-length CID; nothing to track
922                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
923                return cid;
924            }
925            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
926                e.insert(ch);
927                break cid;
928            }
929        }
930    }
931
932    fn handle_first_packet(
933        &mut self,
934        datagram_len: usize,
935        event: DatagramConnectionEvent,
936        addresses: FourTuple,
937        buf: &mut Vec<u8>,
938    ) -> Option<DatagramEvent> {
939        let dst_cid = event.first_decode.dst_cid();
940        let header = event.first_decode.initial_header().unwrap();
941
942        let Some(server_config) = &self.server_config else {
943            debug!("packet for unrecognized connection {}", dst_cid);
944            return self
945                .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
946                .map(DatagramEvent::Response);
947        };
948
949        if datagram_len < MIN_INITIAL_SIZE as usize {
950            debug!("ignoring short initial for connection {}", dst_cid);
951            return None;
952        }
953
954        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
955            Ok(keys) => keys,
956            Err(UnsupportedVersion) => {
957                // This probably indicates that the user set supported_versions incorrectly in
958                // `EndpointConfig`.
959                debug!(
960                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
961                    header.version
962                );
963                return None;
964            }
965        };
966
967        if let Err(reason) = self.early_validate_first_packet(header) {
968            return Some(DatagramEvent::Response(self.initial_close(
969                header.version,
970                addresses,
971                &crypto,
972                &header.src_cid,
973                reason,
974                buf,
975            )));
976        }
977
978        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
979            Ok(packet) => packet,
980            Err(e) => {
981                trace!("unable to decode initial packet: {}", e);
982                return None;
983            }
984        };
985
986        if !packet.reserved_bits_valid() {
987            debug!("dropping connection attempt with invalid reserved bits");
988            return None;
989        }
990
991        let Header::Initial(header) = packet.header else {
992            panic!("non-initial packet in handle_first_packet()");
993        };
994
995        let server_config = self.server_config.as_ref().unwrap().clone();
996
997        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
998            Ok(token) => token,
999            Err(InvalidRetryTokenError) => {
1000                debug!("rejecting invalid retry token");
1001                return Some(DatagramEvent::Response(self.initial_close(
1002                    header.version,
1003                    addresses,
1004                    &crypto,
1005                    &header.src_cid,
1006                    TransportError::INVALID_TOKEN(""),
1007                    buf,
1008                )));
1009            }
1010        };
1011
1012        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1013        self.index
1014            .insert_initial_incoming(header.dst_cid, incoming_idx);
1015
1016        Some(DatagramEvent::NewConnection(Incoming {
1017            received_at: event.now,
1018            addresses,
1019            ecn: event.ecn,
1020            packet: InitialPacket {
1021                header,
1022                header_data: packet.header_data,
1023                payload: packet.payload,
1024            },
1025            rest: event.remaining,
1026            crypto,
1027            token,
1028            incoming_idx,
1029            improper_drop_warner: IncomingImproperDropWarner,
1030        }))
1031    }
1032
1033    /// Attempt to accept this incoming connection (an error may still occur)
1034    // AcceptError cannot be made smaller without semver breakage
1035    #[allow(clippy::result_large_err)]
1036    pub fn accept(
1037        &mut self,
1038        mut incoming: Incoming,
1039        now: Instant,
1040        buf: &mut Vec<u8>,
1041        server_config: Option<Arc<ServerConfig>>,
1042    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1043        let remote_address_validated = incoming.remote_address_validated();
1044        incoming.improper_drop_warner.dismiss();
1045        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1046        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1047
1048        let packet_number = incoming.packet.header.number.expand(0);
1049        let InitialHeader {
1050            src_cid,
1051            dst_cid,
1052            version,
1053            ..
1054        } = incoming.packet.header;
1055        let server_config =
1056            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1057
1058        if server_config
1059            .transport
1060            .max_idle_timeout
1061            .is_some_and(|timeout| {
1062                incoming.received_at + Duration::from_millis(timeout.into()) <= now
1063            })
1064        {
1065            debug!("abandoning accept of stale initial");
1066            self.index.remove_initial(dst_cid);
1067            return Err(AcceptError {
1068                cause: ConnectionError::TimedOut,
1069                response: None,
1070            });
1071        }
1072
1073        if self.cids_exhausted() {
1074            debug!("refusing connection");
1075            self.index.remove_initial(dst_cid);
1076            return Err(AcceptError {
1077                cause: ConnectionError::CidsExhausted,
1078                response: Some(self.initial_close(
1079                    version,
1080                    incoming.addresses,
1081                    &incoming.crypto,
1082                    &src_cid,
1083                    TransportError::CONNECTION_REFUSED(""),
1084                    buf,
1085                )),
1086            });
1087        }
1088
1089        if incoming
1090            .crypto
1091            .packet
1092            .remote
1093            .decrypt(
1094                packet_number,
1095                &incoming.packet.header_data,
1096                &mut incoming.packet.payload,
1097            )
1098            .is_err()
1099        {
1100            debug!(packet_number, "failed to authenticate initial packet");
1101            self.index.remove_initial(dst_cid);
1102            return Err(AcceptError {
1103                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1104                response: None,
1105            });
1106        };
1107
1108        let ch = ConnectionHandle(self.connections.vacant_key());
1109        let loc_cid = self.new_cid(ch);
1110        let mut params = TransportParameters::new(
1111            &server_config.transport,
1112            &self.config,
1113            self.local_cid_generator.as_ref(),
1114            loc_cid,
1115            Some(&server_config),
1116            &mut self.rng,
1117        );
1118        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1119        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1120        params.retry_src_cid = incoming.token.retry_src_cid;
1121        let mut pref_addr_cid = None;
1122        if server_config.has_preferred_address() {
1123            let cid = self.new_cid(ch);
1124            pref_addr_cid = Some(cid);
1125            params.preferred_address = Some(PreferredAddress {
1126                address_v4: server_config.preferred_address_v4,
1127                address_v6: server_config.preferred_address_v6,
1128                connection_id: cid,
1129                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1130            });
1131        }
1132
1133        let tls = server_config.crypto.clone().start_session(version, &params);
1134        let transport_config = server_config.transport.clone();
1135        let mut conn = self.add_connection(
1136            ch,
1137            version,
1138            dst_cid,
1139            loc_cid,
1140            src_cid,
1141            incoming.addresses,
1142            incoming.received_at,
1143            tls,
1144            transport_config,
1145            SideArgs::Server {
1146                server_config,
1147                pref_addr_cid,
1148                path_validated: remote_address_validated,
1149            },
1150        );
1151        self.index.insert_initial(dst_cid, ch);
1152
1153        match conn.handle_first_packet(
1154            incoming.received_at,
1155            incoming.addresses.remote,
1156            incoming.ecn,
1157            packet_number,
1158            incoming.packet,
1159            incoming.rest,
1160        ) {
1161            Ok(()) => {
1162                trace!(id = ch.0, icid = %dst_cid, "new connection");
1163
1164                for event in incoming_buffer.datagrams {
1165                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1166                }
1167
1168                Ok((ch, conn))
1169            }
1170            Err(e) => {
1171                debug!("handshake failed: {}", e);
1172                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1173                let response = match e {
1174                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
1175                        version,
1176                        incoming.addresses,
1177                        &incoming.crypto,
1178                        &src_cid,
1179                        e.clone(),
1180                        buf,
1181                    )),
1182                    _ => None,
1183                };
1184                Err(AcceptError { cause: e, response })
1185            }
1186        }
1187    }
1188
1189    /// Check if we should refuse a connection attempt regardless of the packet's contents
1190    fn early_validate_first_packet(
1191        &mut self,
1192        header: &ProtectedInitialHeader,
1193    ) -> Result<(), TransportError> {
1194        let config = &self.server_config.as_ref().unwrap();
1195        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1196            return Err(TransportError::CONNECTION_REFUSED(""));
1197        }
1198
1199        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
1200        // bytes. If this is a Retry packet, then the length must instead match our usual CID
1201        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
1202        // also need to validate CID length for those after decoding the token.
1203        if header.dst_cid.len() < 8
1204            && (header.token_pos.is_empty()
1205                || header.dst_cid.len() != self.local_cid_generator.cid_len())
1206        {
1207            debug!(
1208                "rejecting connection due to invalid DCID length {}",
1209                header.dst_cid.len()
1210            );
1211            return Err(TransportError::PROTOCOL_VIOLATION(
1212                "invalid destination CID length",
1213            ));
1214        }
1215
1216        Ok(())
1217    }
1218
1219    /// Reject this incoming connection attempt
1220    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1221        self.clean_up_incoming(&incoming);
1222        incoming.improper_drop_warner.dismiss();
1223
1224        self.initial_close(
1225            incoming.packet.header.version,
1226            incoming.addresses,
1227            &incoming.crypto,
1228            &incoming.packet.header.src_cid,
1229            TransportError::CONNECTION_REFUSED(""),
1230            buf,
1231        )
1232    }
1233
1234    /// Respond with a retry packet, requiring the client to retry with address validation
1235    ///
1236    /// Errors if `incoming.may_retry()` is false.
1237    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1238        if !incoming.may_retry() {
1239            return Err(RetryError(Box::new(incoming)));
1240        }
1241
1242        self.clean_up_incoming(&incoming);
1243        incoming.improper_drop_warner.dismiss();
1244
1245        let server_config = self.server_config.as_ref().unwrap();
1246
1247        // First Initial
1248        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
1249        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
1250        // with established connections. In the unlikely event that a collision occurs
1251        // between two connections in the initial phase, both will fail fast and may be
1252        // retried by the application layer.
1253        let loc_cid = self.local_cid_generator.generate_cid();
1254
1255        let payload = TokenPayload::Retry {
1256            address: incoming.addresses.remote,
1257            orig_dst_cid: incoming.packet.header.dst_cid,
1258            issued: server_config.time_source.now(),
1259        };
1260        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1261
1262        let header = Header::Retry {
1263            src_cid: loc_cid,
1264            dst_cid: incoming.packet.header.src_cid,
1265            version: incoming.packet.header.version,
1266        };
1267
1268        let encode = header.encode(buf);
1269        buf.put_slice(&token);
1270        buf.extend_from_slice(&server_config.crypto.retry_tag(
1271            incoming.packet.header.version,
1272            &incoming.packet.header.dst_cid,
1273            buf,
1274        ));
1275        encode.finish(buf, &*incoming.crypto.header.local, None);
1276
1277        Ok(Transmit {
1278            destination: incoming.addresses.remote,
1279            ecn: None,
1280            size: buf.len(),
1281            segment_size: None,
1282            src_ip: incoming.addresses.local_ip,
1283        })
1284    }
1285
1286    /// Ignore this incoming connection attempt, not sending any packet in response
1287    ///
1288    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
1289    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
1290    pub fn ignore(&mut self, incoming: Incoming) {
1291        self.clean_up_incoming(&incoming);
1292        incoming.improper_drop_warner.dismiss();
1293    }
1294
1295    /// Clean up endpoint data structures associated with an `Incoming`.
1296    fn clean_up_incoming(&mut self, incoming: &Incoming) {
1297        self.index.remove_initial(incoming.packet.header.dst_cid);
1298        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1299        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1300    }
1301
1302    fn add_connection(
1303        &mut self,
1304        ch: ConnectionHandle,
1305        version: u32,
1306        init_cid: ConnectionId,
1307        loc_cid: ConnectionId,
1308        rem_cid: ConnectionId,
1309        addresses: FourTuple,
1310        now: Instant,
1311        tls: Box<dyn crypto::Session>,
1312        transport_config: Arc<TransportConfig>,
1313        side_args: SideArgs,
1314    ) -> Connection {
1315        let mut rng_seed = [0; 32];
1316        self.rng.fill_bytes(&mut rng_seed);
1317        let side = side_args.side();
1318        let pref_addr_cid = side_args.pref_addr_cid();
1319        let conn = Connection::new(
1320            self.config.clone(),
1321            transport_config,
1322            init_cid,
1323            loc_cid,
1324            rem_cid,
1325            addresses.remote,
1326            addresses.local_ip,
1327            tls,
1328            self.local_cid_generator.as_ref(),
1329            now,
1330            version,
1331            self.allow_mtud,
1332            rng_seed,
1333            side_args,
1334        );
1335
1336        let mut cids_issued = 0;
1337        let mut loc_cids = FxHashMap::default();
1338
1339        loc_cids.insert(cids_issued, loc_cid);
1340        cids_issued += 1;
1341
1342        if let Some(cid) = pref_addr_cid {
1343            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1344            loc_cids.insert(cids_issued, cid);
1345            cids_issued += 1;
1346        }
1347
1348        let id = self.connections.insert(ConnectionMeta {
1349            init_cid,
1350            cids_issued,
1351            loc_cids,
1352            addresses,
1353            side,
1354            reset_token: None,
1355            peer_id: None,
1356        });
1357        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1358
1359        self.index.insert_conn(addresses, loc_cid, ch, side);
1360
1361        conn
1362    }
1363
1364    fn initial_close(
1365        &mut self,
1366        version: u32,
1367        addresses: FourTuple,
1368        crypto: &Keys,
1369        remote_id: &ConnectionId,
1370        reason: TransportError,
1371        buf: &mut Vec<u8>,
1372    ) -> Transmit {
1373        // We don't need to worry about CID collisions in initial closes because the peer
1374        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
1375        // unexpected response.
1376        let local_id = self.local_cid_generator.generate_cid();
1377        let number = PacketNumber::U8(0);
1378        let header = Header::Initial(InitialHeader {
1379            dst_cid: *remote_id,
1380            src_cid: local_id,
1381            number,
1382            token: Bytes::new(),
1383            version,
1384        });
1385
1386        let partial_encode = header.encode(buf);
1387        let max_len =
1388            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1389        frame::Close::from(reason).encode(buf, max_len);
1390        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1391        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1392        Transmit {
1393            destination: addresses.remote,
1394            ecn: None,
1395            size: buf.len(),
1396            segment_size: None,
1397            src_ip: addresses.local_ip,
1398        }
1399    }
1400
1401    /// Access the configuration used by this endpoint
1402    pub fn config(&self) -> &EndpointConfig {
1403        &self.config
1404    }
1405    
1406    /// Enable or disable address discovery for this endpoint
1407    /// 
1408    /// Address discovery is enabled by default. When enabled, the endpoint will:
1409    /// - Send OBSERVED_ADDRESS frames to peers to inform them of their reflexive addresses
1410    /// - Process received OBSERVED_ADDRESS frames to learn about its own reflexive addresses
1411    /// - Integrate discovered addresses with NAT traversal for improved connectivity
1412    pub fn enable_address_discovery(&mut self, enabled: bool) {
1413        self.address_discovery_enabled = enabled;
1414        // Note: Existing connections will continue with their current setting.
1415        // New connections will use the updated setting.
1416    }
1417    
1418    /// Check if address discovery is enabled
1419    pub fn address_discovery_enabled(&self) -> bool {
1420        self.address_discovery_enabled
1421    }
1422    
1423    /// Get all discovered addresses across all connections
1424    /// 
1425    /// Returns a list of unique socket addresses that have been observed
1426    /// by remote peers and reported via OBSERVED_ADDRESS frames.
1427    /// 
1428    /// Note: This returns an empty vector in the current implementation.
1429    /// Applications should track discovered addresses at the connection level.
1430    pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1431        // TODO: Implement address tracking at the endpoint level
1432        Vec::new()
1433    }
1434    
1435    /// Set a callback to be invoked when an address change is detected
1436    /// 
1437    /// The callback receives the old address (if any) and the new address.
1438    /// Only one callback can be set at a time; setting a new callback replaces the previous one.
1439    pub fn set_address_change_callback<F>(&mut self, callback: F)
1440    where
1441        F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1442    {
1443        self.address_change_callback = Some(Box::new(callback));
1444    }
1445    
1446    /// Clear the address change callback
1447    pub fn clear_address_change_callback(&mut self) {
1448        self.address_change_callback = None;
1449    }
1450    
1451    /// Get address discovery statistics
1452    /// 
1453    /// Note: This returns default statistics in the current implementation.
1454    /// Applications should track statistics at the connection level.
1455    pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1456        // TODO: Implement statistics tracking at the endpoint level
1457        AddressDiscoveryStats::default()
1458    }
1459
1460    /// Number of connections that are currently open
1461    pub fn open_connections(&self) -> usize {
1462        self.connections.len()
1463    }
1464
1465    /// Counter for the number of bytes currently used
1466    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
1467    pub fn incoming_buffer_bytes(&self) -> u64 {
1468        self.all_incoming_buffers_total_bytes
1469    }
1470
1471    #[cfg(test)]
1472    pub(crate) fn known_connections(&self) -> usize {
1473        let x = self.connections.len();
1474        debug_assert_eq!(x, self.index.connection_ids_initial.len());
1475        // Not all connections have known reset tokens
1476        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1477        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
1478        debug_assert!(x >= self.index.incoming_connection_remotes.len());
1479        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1480        x
1481    }
1482
1483    #[cfg(test)]
1484    pub(crate) fn known_cids(&self) -> usize {
1485        self.index.connection_ids.len()
1486    }
1487
1488    /// Whether we've used up 3/4 of the available CID space
1489    ///
1490    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
1491    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
1492    fn cids_exhausted(&self) -> bool {
1493        self.local_cid_generator.cid_len() <= 4
1494            && self.local_cid_generator.cid_len() != 0
1495            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1496                - self.index.connection_ids.len())
1497                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1498    }
1499}
1500
1501impl fmt::Debug for Endpoint {
1502    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1503        fmt.debug_struct("Endpoint")
1504            .field("rng", &self.rng)
1505            .field("index", &self.index)
1506            .field("connections", &self.connections)
1507            .field("config", &self.config)
1508            .field("server_config", &self.server_config)
1509            // incoming_buffers too large
1510            .field("incoming_buffers.len", &self.incoming_buffers.len())
1511            .field(
1512                "all_incoming_buffers_total_bytes",
1513                &self.all_incoming_buffers_total_bytes,
1514            )
1515            .finish()
1516    }
1517}
1518
1519/// Buffered Initial and 0-RTT messages for a pending incoming connection
1520#[derive(Default)]
1521struct IncomingBuffer {
1522    datagrams: Vec<DatagramConnectionEvent>,
1523    total_bytes: u64,
1524}
1525
1526/// Part of protocol state incoming datagrams can be routed to
1527#[derive(Copy, Clone, Debug)]
1528enum RouteDatagramTo {
1529    Incoming(usize),
1530    Connection(ConnectionHandle),
1531}
1532
1533/// Maps packets to existing connections
1534#[derive(Default, Debug)]
1535struct ConnectionIndex {
1536    /// Identifies connections based on the initial DCID the peer utilized
1537    ///
1538    /// Uses a standard `HashMap` to protect against hash collision attacks.
1539    ///
1540    /// Used by the server, not the client.
1541    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1542    /// Identifies connections based on locally created CIDs
1543    ///
1544    /// Uses a cheaper hash function since keys are locally created
1545    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1546    /// Identifies incoming connections with zero-length CIDs
1547    ///
1548    /// Uses a standard `HashMap` to protect against hash collision attacks.
1549    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1550    /// Identifies outgoing connections with zero-length CIDs
1551    ///
1552    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1553    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1554    /// may be established per remote. We must omit the local address from the key because we don't
1555    /// necessarily know what address we're sending from, and hence receiving at.
1556    ///
1557    /// Uses a standard `HashMap` to protect against hash collision attacks.
1558    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1559    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1560    ///
1561    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1562    /// recipient, if any.
1563    connection_reset_tokens: ResetTokenTable,
1564}
1565
1566impl ConnectionIndex {
1567    /// Associate an incoming connection with its initial destination CID
1568    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1569        if dst_cid.is_empty() {
1570            return;
1571        }
1572        self.connection_ids_initial
1573            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1574    }
1575
1576    /// Remove an association with an initial destination CID
1577    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1578        if dst_cid.is_empty() {
1579            return;
1580        }
1581        let removed = self.connection_ids_initial.remove(&dst_cid);
1582        debug_assert!(removed.is_some());
1583    }
1584
1585    /// Associate a connection with its initial destination CID
1586    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1587        if dst_cid.is_empty() {
1588            return;
1589        }
1590        self.connection_ids_initial
1591            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1592    }
1593
1594    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1595    /// its current 4-tuple
1596    fn insert_conn(
1597        &mut self,
1598        addresses: FourTuple,
1599        dst_cid: ConnectionId,
1600        connection: ConnectionHandle,
1601        side: Side,
1602    ) {
1603        match dst_cid.len() {
1604            0 => match side {
1605                Side::Server => {
1606                    self.incoming_connection_remotes
1607                        .insert(addresses, connection);
1608                }
1609                Side::Client => {
1610                    self.outgoing_connection_remotes
1611                        .insert(addresses.remote, connection);
1612                }
1613            },
1614            _ => {
1615                self.connection_ids.insert(dst_cid, connection);
1616            }
1617        }
1618    }
1619
1620    /// Discard a connection ID
1621    fn retire(&mut self, dst_cid: ConnectionId) {
1622        self.connection_ids.remove(&dst_cid);
1623    }
1624
1625    /// Remove all references to a connection
1626    fn remove(&mut self, conn: &ConnectionMeta) {
1627        if conn.side.is_server() {
1628            self.remove_initial(conn.init_cid);
1629        }
1630        for cid in conn.loc_cids.values() {
1631            self.connection_ids.remove(cid);
1632        }
1633        self.incoming_connection_remotes.remove(&conn.addresses);
1634        self.outgoing_connection_remotes
1635            .remove(&conn.addresses.remote);
1636        if let Some((remote, token)) = conn.reset_token {
1637            self.connection_reset_tokens.remove(remote, token);
1638        }
1639    }
1640
1641    /// Find the existing connection that `datagram` should be routed to, if any
1642    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1643        let dst_cid = datagram.dst_cid();
1644        let is_empty_cid = dst_cid.is_empty();
1645
1646        // Fast path: Try most common lookup first (non-empty CID)
1647        if !is_empty_cid {
1648            if let Some(&ch) = self.connection_ids.get(dst_cid) {
1649                return Some(RouteDatagramTo::Connection(ch));
1650            }
1651        }
1652
1653        // Initial/0RTT packet lookup
1654        if datagram.is_initial() || datagram.is_0rtt() {
1655            if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1656                return Some(ch);
1657            }
1658        }
1659
1660        // Empty CID lookup (less common, do after fast path)
1661        if is_empty_cid {
1662            // Check incoming connections first (servers handle more incoming)
1663            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1664                return Some(RouteDatagramTo::Connection(ch));
1665            }
1666            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1667                return Some(RouteDatagramTo::Connection(ch));
1668            }
1669        }
1670
1671        // Stateless reset token lookup (least common, do last)
1672        let data = datagram.data();
1673        if data.len() < RESET_TOKEN_SIZE {
1674            return None;
1675        }
1676        self.connection_reset_tokens
1677            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1678            .cloned()
1679            .map(RouteDatagramTo::Connection)
1680    }
1681}
1682
1683#[derive(Debug)]
1684pub(crate) struct ConnectionMeta {
1685    init_cid: ConnectionId,
1686    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1687    cids_issued: u64,
1688    loc_cids: FxHashMap<u64, ConnectionId>,
1689    /// Remote/local addresses the connection began with
1690    ///
1691    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1692    /// bother keeping it up to date.
1693    addresses: FourTuple,
1694    side: Side,
1695    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1696    /// being sent to
1697    reset_token: Option<(SocketAddr, ResetToken)>,
1698    /// Peer ID for this connection, used for relay functionality
1699    peer_id: Option<PeerId>,
1700}
1701
1702/// Internal identifier for a `Connection` currently associated with an endpoint
1703#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1704pub struct ConnectionHandle(pub usize);
1705
1706impl From<ConnectionHandle> for usize {
1707    fn from(x: ConnectionHandle) -> Self {
1708        x.0
1709    }
1710}
1711
1712impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1713    type Output = ConnectionMeta;
1714    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1715        &self[ch.0]
1716    }
1717}
1718
1719impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1720    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1721        &mut self[ch.0]
1722    }
1723}
1724
1725/// Event resulting from processing a single datagram
1726pub enum DatagramEvent {
1727    /// The datagram is redirected to its `Connection`
1728    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1729    /// The datagram may result in starting a new `Connection`
1730    NewConnection(Incoming),
1731    /// Response generated directly by the endpoint
1732    Response(Transmit),
1733}
1734
1735/// An incoming connection for which the server has not yet begun its part of the handshake.
1736pub struct Incoming {
1737    received_at: Instant,
1738    addresses: FourTuple,
1739    ecn: Option<EcnCodepoint>,
1740    packet: InitialPacket,
1741    rest: Option<BytesMut>,
1742    crypto: Keys,
1743    token: IncomingToken,
1744    incoming_idx: usize,
1745    improper_drop_warner: IncomingImproperDropWarner,
1746}
1747
1748impl Incoming {
1749    /// The local IP address which was used when the peer established the connection
1750    ///
1751    /// This has the same behavior as [`Connection::local_ip`].
1752    pub fn local_ip(&self) -> Option<IpAddr> {
1753        self.addresses.local_ip
1754    }
1755
1756    /// The peer's UDP address
1757    pub fn remote_address(&self) -> SocketAddr {
1758        self.addresses.remote
1759    }
1760
1761    /// Whether the socket address that is initiating this connection has been validated
1762    ///
1763    /// This means that the sender of the initial packet has proved that they can receive traffic
1764    /// sent to `self.remote_address()`.
1765    ///
1766    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1767    /// The inverse is not guaranteed.
1768    pub fn remote_address_validated(&self) -> bool {
1769        self.token.validated
1770    }
1771
1772    /// Whether it is legal to respond with a retry packet
1773    ///
1774    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1775    /// The inverse is not guaranteed.
1776    pub fn may_retry(&self) -> bool {
1777        self.token.retry_src_cid.is_none()
1778    }
1779
1780    /// The original destination connection ID sent by the client
1781    pub fn orig_dst_cid(&self) -> &ConnectionId {
1782        &self.token.orig_dst_cid
1783    }
1784}
1785
1786impl fmt::Debug for Incoming {
1787    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1788        f.debug_struct("Incoming")
1789            .field("addresses", &self.addresses)
1790            .field("ecn", &self.ecn)
1791            // packet doesn't implement debug
1792            // rest is too big and not meaningful enough
1793            .field("token", &self.token)
1794            .field("incoming_idx", &self.incoming_idx)
1795            // improper drop warner contains no information
1796            .finish_non_exhaustive()
1797    }
1798}
1799
1800struct IncomingImproperDropWarner;
1801
1802impl IncomingImproperDropWarner {
1803    fn dismiss(self) {
1804        mem::forget(self);
1805    }
1806}
1807
1808impl Drop for IncomingImproperDropWarner {
1809    fn drop(&mut self) {
1810        warn!(
1811            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1812               (may cause memory leak and eventual inability to accept new connections)"
1813        );
1814    }
1815}
1816
1817/// Errors in the parameters being used to create a new connection
1818///
1819/// These arise before any I/O has been performed.
1820#[derive(Debug, Error, Clone, PartialEq, Eq)]
1821pub enum ConnectError {
1822    /// The endpoint can no longer create new connections
1823    ///
1824    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1825    #[error("endpoint stopping")]
1826    EndpointStopping,
1827    /// The connection could not be created because not enough of the CID space is available
1828    ///
1829    /// Try using longer connection IDs
1830    #[error("CIDs exhausted")]
1831    CidsExhausted,
1832    /// The given server name was malformed
1833    #[error("invalid server name: {0}")]
1834    InvalidServerName(String),
1835    /// The remote [`SocketAddr`] supplied was malformed
1836    ///
1837    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1838    #[error("invalid remote address: {0}")]
1839    InvalidRemoteAddress(SocketAddr),
1840    /// No default client configuration was set up
1841    ///
1842    /// Use `Endpoint::connect_with` to specify a client configuration.
1843    #[error("no default client config")]
1844    NoDefaultClientConfig,
1845    /// The local endpoint does not support the QUIC version specified in the client configuration
1846    #[error("unsupported QUIC version")]
1847    UnsupportedVersion,
1848}
1849
1850/// Error type for attempting to accept an [`Incoming`]
1851#[derive(Debug)]
1852pub struct AcceptError {
1853    /// Underlying error describing reason for failure
1854    pub cause: ConnectionError,
1855    /// Optional response to transmit back
1856    pub response: Option<Transmit>,
1857}
1858
1859/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1860#[derive(Debug, Error)]
1861#[error("retry() with validated Incoming")]
1862pub struct RetryError(Box<Incoming>);
1863
1864impl RetryError {
1865    /// Get the [`Incoming`]
1866    pub fn into_incoming(self) -> Incoming {
1867        *self.0
1868    }
1869}
1870
1871/// Reset Tokens which are associated with peer socket addresses
1872///
1873/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1874/// peer generated and might be usable for hash collision attacks.
1875#[derive(Default, Debug)]
1876struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1877
1878impl ResetTokenTable {
1879    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1880        self.0
1881            .entry(remote)
1882            .or_default()
1883            .insert(token, ch)
1884            .is_some()
1885    }
1886
1887    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1888        use std::collections::hash_map::Entry;
1889        match self.0.entry(remote) {
1890            Entry::Vacant(_) => {}
1891            Entry::Occupied(mut e) => {
1892                e.get_mut().remove(&token);
1893                if e.get().is_empty() {
1894                    e.remove_entry();
1895                }
1896            }
1897        }
1898    }
1899
1900    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1901        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1902        self.0.get(&remote)?.get(&token)
1903    }
1904}
1905
1906/// Identifies a connection by the combination of remote and local addresses
1907///
1908/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1909/// subnet and zero-length connection IDs are in use.
1910#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1911struct FourTuple {
1912    remote: SocketAddr,
1913    // A single socket can only listen on a single port, so no need to store it explicitly
1914    local_ip: Option<IpAddr>,
1915}