ant_quic/
endpoint.rs

1use std::{
2    collections::{HashMap, VecDeque, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21    Side, Transmit, TransportConfig, TransportError,
22    cid_generator::ConnectionIdGenerator,
23    coding::BufMutExt,
24    config::{ClientConfig, EndpointConfig, ServerConfig},
25    connection::{Connection, ConnectionError, SideArgs},
26    crypto::{self, Keys, UnsupportedVersion},
27    frame,
28    nat_traversal_api::PeerId,
29    packet::{
30        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31        PacketNumber, PartialDecode, ProtectedInitialHeader,
32    },
33    shared::{
34        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35        EndpointEvent, EndpointEventInner, IssuedCid,
36    },
37    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38    transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41/// A queued relay request for bootstrap nodes
42#[derive(Debug, Clone)]
43struct RelayQueueItem {
44    /// Target peer ID for the relay
45    target_peer_id: PeerId,
46    /// Frame to be relayed
47    frame: frame::PunchMeNow,
48    /// When this relay request was created
49    created_at: Instant,
50    /// Number of relay attempts made
51    attempts: u32,
52    /// Last attempt time
53    last_attempt: Option<Instant>,
54}
55
56/// Relay queue management for bootstrap nodes
57#[derive(Debug)]
58struct RelayQueue {
59    /// Pending relay requests with insertion order and O(1) access
60    pending: IndexMap<u64, RelayQueueItem>,
61    /// Next sequence number for insertion order
62    next_seq: u64,
63    /// Maximum queue size to prevent memory exhaustion
64    max_queue_size: usize,
65    /// Timeout for relay requests
66    request_timeout: Duration,
67    /// Maximum retry attempts per request
68    max_retries: u32,
69    /// Minimum interval between retry attempts
70    retry_interval: Duration,
71    /// Rate limiting: track recent relay requests per peer
72    rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73    /// Maximum relays per peer per time window
74    max_relays_per_peer: usize,
75    /// Rate limiting time window
76    rate_limit_window: Duration,
77}
78
79/// Address discovery statistics
80#[derive(Debug, Default, Clone)]
81pub struct AddressDiscoveryStats {
82    /// Number of OBSERVED_ADDRESS frames sent
83    pub frames_sent: u64,
84    /// Number of OBSERVED_ADDRESS frames received
85    pub frames_received: u64,
86    /// Number of unique addresses discovered
87    pub addresses_discovered: u64,
88    /// Number of address changes detected
89    pub address_changes_detected: u64,
90}
91
92/// Relay statistics for monitoring and debugging
93#[derive(Debug, Default)]
94pub struct RelayStats {
95    /// Total relay requests received
96    requests_received: u64,
97    /// Successfully relayed requests
98    requests_relayed: u64,
99    /// Failed relay requests (peer not found)
100    requests_failed: u64,
101    /// Requests dropped due to queue full
102    requests_dropped: u64,
103    /// Requests timed out
104    requests_timed_out: u64,
105    /// Requests dropped due to rate limiting
106    requests_rate_limited: u64,
107    /// Current queue size
108    current_queue_size: usize,
109}
110
111impl RelayQueue {
112    /// Create a new relay queue with default settings
113    fn new() -> Self {
114        Self {
115            pending: IndexMap::new(),
116            next_seq: 0,
117            max_queue_size: 1000,                     // Reasonable default
118            request_timeout: Duration::from_secs(30), // 30 second timeout
119            max_retries: 3,
120            retry_interval: Duration::from_millis(500), // 500ms between retries
121            rate_limiter: HashMap::new(),
122            max_relays_per_peer: 10, // Max 10 relays per peer per time window
123            rate_limit_window: Duration::from_secs(60), // 1 minute window
124        }
125    }
126
127    /// Add a relay request to the queue
128    fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
129        // Check queue size limit
130        if self.pending.len() >= self.max_queue_size {
131            warn!(
132                "Relay queue full, dropping request for peer {:?}",
133                target_peer_id
134            );
135            return false;
136        }
137
138        // Check rate limit for this peer
139        if !self.check_rate_limit(target_peer_id, now) {
140            warn!(
141                "Rate limit exceeded for peer {:?}, dropping relay request",
142                target_peer_id
143            );
144            return false;
145        }
146
147        let item = RelayQueueItem {
148            target_peer_id,
149            frame,
150            created_at: now,
151            attempts: 0,
152            last_attempt: None,
153        };
154
155        let seq = self.next_seq;
156        self.next_seq += 1;
157        self.pending.insert(seq, item);
158
159        // Record this request for rate limiting
160        self.record_relay_request(target_peer_id, now);
161
162        trace!(
163            "Queued relay request for peer {:?}, queue size: {}",
164            target_peer_id,
165            self.pending.len()
166        );
167        true
168    }
169
170    /// Check if a relay request is within rate limits
171    fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
172        // Clean up old entries first
173        self.cleanup_rate_limiter(now);
174
175        // Check current request count for this peer
176        if let Some(requests) = self.rate_limiter.get(&peer_id) {
177            requests.len() < self.max_relays_per_peer
178        } else {
179            true // No previous requests, allow
180        }
181    }
182
183    /// Record a relay request for rate limiting
184    fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
185        self.rate_limiter.entry(peer_id).or_default().push_back(now);
186    }
187
188    /// Clean up old rate limiting entries
189    fn cleanup_rate_limiter(&mut self, now: Instant) {
190        self.rate_limiter.retain(|_, requests| {
191            requests.retain(|&request_time| {
192                now.saturating_duration_since(request_time) <= self.rate_limit_window
193            });
194            !requests.is_empty()
195        });
196    }
197
198    /// Get the next relay request that's ready to be processed
199    fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
200        // Find the first request that's ready to be retried
201        let mut expired_keys = Vec::new();
202        let mut ready_key = None;
203
204        for (seq, item) in &self.pending {
205            // Check if request has timed out
206            if now.saturating_duration_since(item.created_at) > self.request_timeout {
207                expired_keys.push(*seq);
208                continue;
209            }
210
211            // Check if it's ready for retry
212            if item.attempts == 0
213                || item
214                    .last_attempt
215                    .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
216            {
217                ready_key = Some(*seq);
218                break;
219            }
220        }
221
222        // Remove expired items
223        for key in expired_keys {
224            if let Some(expired) = self.pending.shift_remove(&key) {
225                debug!(
226                    "Relay request for peer {:?} timed out after {:?}",
227                    expired.target_peer_id,
228                    now.saturating_duration_since(expired.created_at)
229                );
230            }
231        }
232
233        // Return ready item if found
234        if let Some(key) = ready_key {
235            if let Some(mut item) = self.pending.shift_remove(&key) {
236                item.attempts += 1;
237                item.last_attempt = Some(now);
238                return Some(item);
239            }
240        }
241
242        None
243    }
244
245    /// Requeue a failed relay request if it hasn't exceeded max retries
246    fn requeue_failed(&mut self, item: RelayQueueItem) {
247        if item.attempts < self.max_retries {
248            trace!(
249                "Requeuing failed relay request for peer {:?}, attempt {}/{}",
250                item.target_peer_id, item.attempts, self.max_retries
251            );
252            let seq = self.next_seq;
253            self.next_seq += 1;
254            self.pending.insert(seq, item);
255        } else {
256            debug!(
257                "Dropping relay request for peer {:?} after {} failed attempts",
258                item.target_peer_id, item.attempts
259            );
260        }
261    }
262
263    /// Clean up expired requests and return number of items cleaned
264    fn cleanup_expired(&mut self, now: Instant) -> usize {
265        let initial_len = self.pending.len();
266
267        // Collect expired keys
268        let expired_keys: Vec<u64> = self
269            .pending
270            .iter()
271            .filter_map(|(seq, item)| {
272                if now.saturating_duration_since(item.created_at) > self.request_timeout {
273                    Some(*seq)
274                } else {
275                    None
276                }
277            })
278            .collect();
279
280        // Remove expired items
281        for key in expired_keys {
282            if let Some(expired) = self.pending.shift_remove(&key) {
283                debug!(
284                    "Removing expired relay request for peer {:?}",
285                    expired.target_peer_id
286                );
287            }
288        }
289
290        initial_len - self.pending.len()
291    }
292
293    /// Get current queue length
294    fn len(&self) -> usize {
295        self.pending.len()
296    }
297}
298
299/// The main entry point to the library
300///
301/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
302/// connection-generated events via `handle` and `handle_event`.
303pub struct Endpoint {
304    rng: StdRng,
305    index: ConnectionIndex,
306    connections: Slab<ConnectionMeta>,
307    local_cid_generator: Box<dyn ConnectionIdGenerator>,
308    config: Arc<EndpointConfig>,
309    server_config: Option<Arc<ServerConfig>>,
310    /// Whether the underlying UDP socket promises not to fragment packets
311    allow_mtud: bool,
312    /// Time at which a stateless reset was most recently sent
313    last_stateless_reset: Option<Instant>,
314    /// Buffered Initial and 0-RTT messages for pending incoming connections
315    incoming_buffers: Slab<IncomingBuffer>,
316    all_incoming_buffers_total_bytes: u64,
317    /// Mapping from peer IDs to connection handles for relay functionality
318    peer_connections: HashMap<PeerId, ConnectionHandle>,
319    /// Relay queue for bootstrap nodes
320    relay_queue: RelayQueue,
321    /// Relay statistics
322    relay_stats: RelayStats,
323    /// Whether address discovery is enabled (default: true)
324    address_discovery_enabled: bool,
325    /// Address change callback
326    address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
327}
328
329impl Endpoint {
330    /// Create a new endpoint
331    ///
332    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
333    /// better performance. This requires that outgoing packets are never fragmented, which can be
334    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
335    ///
336    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
337    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
338    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
339    /// using [`EndpointConfig::rng_seed`].
340    pub fn new(
341        config: Arc<EndpointConfig>,
342        server_config: Option<Arc<ServerConfig>>,
343        allow_mtud: bool,
344        rng_seed: Option<[u8; 32]>,
345    ) -> Self {
346        let rng_seed = rng_seed.or(config.rng_seed);
347        Self {
348            rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
349            index: ConnectionIndex::default(),
350            connections: Slab::new(),
351            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
352            config,
353            server_config,
354            allow_mtud,
355            last_stateless_reset: None,
356            incoming_buffers: Slab::new(),
357            all_incoming_buffers_total_bytes: 0,
358            peer_connections: HashMap::new(),
359            relay_queue: RelayQueue::new(),
360            relay_stats: RelayStats::default(),
361            address_discovery_enabled: true, // Default to enabled
362            address_change_callback: None,
363        }
364    }
365
366    /// Replace the server configuration, affecting new incoming connections only
367    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
368        self.server_config = server_config;
369    }
370
371    /// Register a peer ID with a connection handle for relay functionality
372    pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
373        self.peer_connections.insert(peer_id, connection_handle);
374        trace!(
375            "Registered peer {:?} with connection {:?}",
376            peer_id, connection_handle
377        );
378    }
379
380    /// Unregister a peer ID from the connection mapping
381    pub fn unregister_peer(&mut self, peer_id: &PeerId) {
382        if let Some(handle) = self.peer_connections.remove(peer_id) {
383            trace!(
384                "Unregistered peer {:?} from connection {:?}",
385                peer_id, handle
386            );
387        }
388    }
389
390    /// Look up a connection handle for a given peer ID
391    pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
392        self.peer_connections.get(peer_id).copied()
393    }
394
395    /// Queue a frame for relay to a target peer
396    pub(crate) fn queue_frame_for_peer(
397        &mut self,
398        peer_id: &PeerId,
399        frame: frame::PunchMeNow,
400    ) -> bool {
401        self.relay_stats.requests_received += 1;
402
403        if let Some(ch) = self.lookup_peer_connection(peer_id) {
404            // Peer is currently connected, try to relay immediately
405            if self.relay_frame_to_connection(ch, frame.clone()) {
406                self.relay_stats.requests_relayed += 1;
407                trace!(
408                    "Immediately relayed frame to peer {:?} via connection {:?}",
409                    peer_id, ch
410                );
411                return true;
412            }
413        }
414
415        // Peer not connected or immediate relay failed, queue for later
416        let now = Instant::now();
417        if self.relay_queue.enqueue(*peer_id, frame, now) {
418            self.relay_stats.current_queue_size = self.relay_queue.len();
419            trace!("Queued relay request for peer {:?}", peer_id);
420            true
421        } else {
422            // Check if it was rate limited or queue full
423            if !self.relay_queue.check_rate_limit(*peer_id, now) {
424                self.relay_stats.requests_rate_limited += 1;
425            } else {
426                self.relay_stats.requests_dropped += 1;
427            }
428            false
429        }
430    }
431
432    /// Attempt to relay a frame to a specific connection
433    fn relay_frame_to_connection(
434        &mut self,
435        ch: ConnectionHandle,
436        _frame: frame::PunchMeNow,
437    ) -> bool {
438        // In a complete implementation, this would queue the frame in the connection's
439        // pending frames. For now, we'll just return true to indicate success.
440        // The actual frame queuing would need to be implemented at the connection level.
441
442        // TODO: Implement actual frame queuing to connection's pending frames
443        trace!("Would relay frame to connection {:?}", ch);
444        true
445    }
446
447    /// Set the peer ID for an existing connection
448    pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
449        if let Some(connection) = self.connections.get_mut(connection_handle.0) {
450            connection.peer_id = Some(peer_id);
451            self.register_peer(peer_id, connection_handle);
452
453            // Process any queued relay requests for this peer
454            self.process_queued_relays_for_peer(peer_id);
455        }
456    }
457
458    /// Process queued relay requests for a specific peer that just connected
459    fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
460        let _now = Instant::now();
461        let mut processed = 0;
462
463        // Collect items to process for this peer
464        let mut items_to_process = Vec::new();
465        let mut keys_to_remove = Vec::new();
466
467        // Find all items for this peer
468        for (seq, item) in &self.relay_queue.pending {
469            if item.target_peer_id == peer_id {
470                items_to_process.push(item.clone());
471                keys_to_remove.push(*seq);
472            }
473        }
474
475        // Remove items from queue
476        for key in keys_to_remove {
477            self.relay_queue.pending.shift_remove(&key);
478        }
479
480        // Process the items
481        for item in items_to_process {
482            if let Some(ch) = self.lookup_peer_connection(&peer_id) {
483                if self.relay_frame_to_connection(ch, item.frame.clone()) {
484                    self.relay_stats.requests_relayed += 1;
485                    processed += 1;
486                    trace!("Processed queued relay for peer {:?}", peer_id);
487                } else {
488                    // Failed to relay, requeue
489                    self.relay_queue.requeue_failed(item);
490                    self.relay_stats.requests_failed += 1;
491                }
492            }
493        }
494
495        self.relay_stats.current_queue_size = self.relay_queue.len();
496
497        if processed > 0 {
498            debug!(
499                "Processed {} queued relay requests for peer {:?}",
500                processed, peer_id
501            );
502        }
503    }
504
505    /// Process pending relay requests (should be called periodically)
506    pub fn process_relay_queue(&mut self) {
507        let now = Instant::now();
508        let mut processed = 0;
509        let mut failed = 0;
510
511        // Process ready relay requests
512        while let Some(item) = self.relay_queue.next_ready(now) {
513            if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
514                if self.relay_frame_to_connection(ch, item.frame.clone()) {
515                    self.relay_stats.requests_relayed += 1;
516                    processed += 1;
517                    trace!(
518                        "Successfully relayed frame to peer {:?}",
519                        item.target_peer_id
520                    );
521                } else {
522                    // Failed to relay, requeue for retry
523                    self.relay_queue.requeue_failed(item);
524                    self.relay_stats.requests_failed += 1;
525                    failed += 1;
526                }
527            } else {
528                // Peer not connected, requeue for later
529                self.relay_queue.requeue_failed(item);
530                failed += 1;
531            }
532        }
533
534        // Clean up expired requests
535        let expired = self.relay_queue.cleanup_expired(now);
536        if expired > 0 {
537            self.relay_stats.requests_timed_out += expired as u64;
538            debug!("Cleaned up {} expired relay requests", expired);
539        }
540
541        self.relay_stats.current_queue_size = self.relay_queue.len();
542
543        if processed > 0 || failed > 0 {
544            trace!(
545                "Relay queue processing: {} processed, {} failed, {} in queue",
546                processed,
547                failed,
548                self.relay_queue.len()
549            );
550        }
551    }
552
553    /// Get relay statistics for monitoring
554    pub fn relay_stats(&self) -> &RelayStats {
555        &self.relay_stats
556    }
557
558    /// Get relay queue length
559    pub fn relay_queue_len(&self) -> usize {
560        self.relay_queue.len()
561    }
562
563    /// Process `EndpointEvent`s emitted from related `Connection`s
564    ///
565    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
566    pub fn handle_event(
567        &mut self,
568        ch: ConnectionHandle,
569        event: EndpointEvent,
570    ) -> Option<ConnectionEvent> {
571        use EndpointEventInner::*;
572        match event.0 {
573            EndpointEventInner::NeedIdentifiers(now, n) => {
574                return Some(self.send_new_identifiers(now, ch, n));
575            }
576            ResetToken(remote, token) => {
577                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
578                    self.index.connection_reset_tokens.remove(old.0, old.1);
579                }
580                if self.index.connection_reset_tokens.insert(remote, token, ch) {
581                    warn!("duplicate reset token");
582                }
583            }
584            RetireConnectionId(now, seq, allow_more_cids) => {
585                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
586                    trace!("peer retired CID {}: {}", seq, cid);
587                    self.index.retire(cid);
588                    if allow_more_cids {
589                        return Some(self.send_new_identifiers(now, ch, 1));
590                    }
591                }
592            }
593            RelayPunchMeNow(target_peer_id, punch_me_now) => {
594                // Handle relay request from bootstrap node
595                let peer_id = PeerId(target_peer_id);
596                if self.queue_frame_for_peer(&peer_id, punch_me_now) {
597                    trace!(
598                        "Successfully queued PunchMeNow frame for relay to peer {:?}",
599                        peer_id
600                    );
601                } else {
602                    warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
603                }
604            }
605            SendAddressFrame(add_address_frame) => {
606                // Handle bootstrap node request to send ADD_ADDRESS frame
607                trace!(
608                    "Sending ADD_ADDRESS frame: seq={}, addr={}, priority={}",
609                    add_address_frame.sequence,
610                    add_address_frame.address,
611                    add_address_frame.priority
612                );
613
614                // For now, log the frame since the queuing mechanism needs more integration
615                // TODO: Implement proper frame queuing in the connection layer
616                debug!(
617                    "ADD_ADDRESS frame ready for transmission: {:?}",
618                    add_address_frame
619                );
620            }
621            NatCandidateValidated { address, challenge } => {
622                // Handle successful NAT traversal candidate validation
623                trace!(
624                    "NAT candidate validation succeeded for {} with challenge {:016x}",
625                    address, challenge
626                );
627
628                // The validation success is primarily handled by the connection-level state machine
629                // This event serves as notification to the endpoint for potential coordination
630                // with other components or logging/metrics collection
631                debug!("NAT candidate {} validated successfully", address);
632            }
633            Drained => {
634                if let Some(conn) = self.connections.try_remove(ch.0) {
635                    self.index.remove(&conn);
636                    // Clean up peer connection mapping if this connection has a peer ID
637                    if let Some(peer_id) = conn.peer_id {
638                        self.peer_connections.remove(&peer_id);
639                        trace!("Cleaned up peer connection mapping for {:?}", peer_id);
640                    }
641                } else {
642                    // This indicates a bug in downstream code, which could cause spurious
643                    // connection loss instead of this error if the CID was (re)allocated prior to
644                    // the illegal call.
645                    error!(id = ch.0, "unknown connection drained");
646                }
647            }
648        }
649        None
650    }
651
652    /// Process an incoming UDP datagram
653    pub fn handle(
654        &mut self,
655        now: Instant,
656        remote: SocketAddr,
657        local_ip: Option<IpAddr>,
658        ecn: Option<EcnCodepoint>,
659        data: BytesMut,
660        buf: &mut Vec<u8>,
661    ) -> Option<DatagramEvent> {
662        // Partially decode packet or short-circuit if unable
663        let datagram_len = data.len();
664        let event = match PartialDecode::new(
665            data,
666            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
667            &self.config.supported_versions,
668            self.config.grease_quic_bit,
669        ) {
670            Ok((first_decode, remaining)) => DatagramConnectionEvent {
671                now,
672                remote,
673                ecn,
674                first_decode,
675                remaining,
676            },
677            Err(PacketDecodeError::UnsupportedVersion {
678                src_cid,
679                dst_cid,
680                version,
681            }) => {
682                if self.server_config.is_none() {
683                    debug!("dropping packet with unsupported version");
684                    return None;
685                }
686                trace!("sending version negotiation");
687                // Negotiate versions
688                Header::VersionNegotiate {
689                    random: self.rng.r#gen::<u8>() | 0x40,
690                    src_cid: dst_cid,
691                    dst_cid: src_cid,
692                }
693                .encode(buf);
694                // Grease with a reserved version
695                buf.write::<u32>(match version {
696                    0x0a1a_2a3a => 0x0a1a_2a4a,
697                    _ => 0x0a1a_2a3a,
698                });
699                for &version in &self.config.supported_versions {
700                    buf.write(version);
701                }
702                return Some(DatagramEvent::Response(Transmit {
703                    destination: remote,
704                    ecn: None,
705                    size: buf.len(),
706                    segment_size: None,
707                    src_ip: local_ip,
708                }));
709            }
710            Err(e) => {
711                trace!("malformed header: {}", e);
712                return None;
713            }
714        };
715
716        let addresses = FourTuple { remote, local_ip };
717        let dst_cid = event.first_decode.dst_cid();
718
719        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
720            // Handle packet on existing connection
721            match route_to {
722                RouteDatagramTo::Incoming(incoming_idx) => {
723                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
724                    let config = &self.server_config.as_ref().unwrap();
725
726                    if incoming_buffer
727                        .total_bytes
728                        .checked_add(datagram_len as u64)
729                        .is_some_and(|n| n <= config.incoming_buffer_size)
730                        && self
731                            .all_incoming_buffers_total_bytes
732                            .checked_add(datagram_len as u64)
733                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
734                    {
735                        incoming_buffer.datagrams.push(event);
736                        incoming_buffer.total_bytes += datagram_len as u64;
737                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
738                    }
739
740                    None
741                }
742                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
743                    ch,
744                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
745                )),
746            }
747        } else if event.first_decode.initial_header().is_some() {
748            // Potentially create a new connection
749
750            self.handle_first_packet(datagram_len, event, addresses, buf)
751        } else if event.first_decode.has_long_header() {
752            debug!(
753                "ignoring non-initial packet for unknown connection {}",
754                dst_cid
755            );
756            None
757        } else if !event.first_decode.is_initial()
758            && self.local_cid_generator.validate(dst_cid).is_err()
759        {
760            // If we got this far, we're receiving a seemingly valid packet for an unknown
761            // connection. Send a stateless reset if possible.
762
763            debug!("dropping packet with invalid CID");
764            None
765        } else if dst_cid.is_empty() {
766            trace!("dropping unrecognized short packet without ID");
767            None
768        } else {
769            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
770                .map(DatagramEvent::Response)
771        }
772    }
773
774    fn stateless_reset(
775        &mut self,
776        now: Instant,
777        inciting_dgram_len: usize,
778        addresses: FourTuple,
779        dst_cid: ConnectionId,
780        buf: &mut Vec<u8>,
781    ) -> Option<Transmit> {
782        if self
783            .last_stateless_reset
784            .is_some_and(|last| last + self.config.min_reset_interval > now)
785        {
786            debug!("ignoring unexpected packet within minimum stateless reset interval");
787            return None;
788        }
789
790        /// Minimum amount of padding for the stateless reset to look like a short-header packet
791        const MIN_PADDING_LEN: usize = 5;
792
793        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
794        // smaller than the inciting packet.
795        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
796            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
797            _ => {
798                debug!(
799                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
800                    inciting_dgram_len
801                );
802                return None;
803            }
804        };
805
806        debug!(
807            "sending stateless reset for {} to {}",
808            dst_cid, addresses.remote
809        );
810        self.last_stateless_reset = Some(now);
811        // Resets with at least this much padding can't possibly be distinguished from real packets
812        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
813        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
814            max_padding_len
815        } else {
816            self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
817        };
818        buf.reserve(padding_len + RESET_TOKEN_SIZE);
819        buf.resize(padding_len, 0);
820        self.rng.fill_bytes(&mut buf[0..padding_len]);
821        buf[0] = 0b0100_0000 | (buf[0] >> 2);
822        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
823
824        debug_assert!(buf.len() < inciting_dgram_len);
825
826        Some(Transmit {
827            destination: addresses.remote,
828            ecn: None,
829            size: buf.len(),
830            segment_size: None,
831            src_ip: addresses.local_ip,
832        })
833    }
834
835    /// Initiate a connection
836    pub fn connect(
837        &mut self,
838        now: Instant,
839        config: ClientConfig,
840        remote: SocketAddr,
841        server_name: &str,
842    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
843        if self.cids_exhausted() {
844            return Err(ConnectError::CidsExhausted);
845        }
846        if remote.port() == 0 || remote.ip().is_unspecified() {
847            return Err(ConnectError::InvalidRemoteAddress(remote));
848        }
849        if !self.config.supported_versions.contains(&config.version) {
850            return Err(ConnectError::UnsupportedVersion);
851        }
852
853        let remote_id = (config.initial_dst_cid_provider)();
854        trace!(initial_dcid = %remote_id);
855
856        let ch = ConnectionHandle(self.connections.vacant_key());
857        let loc_cid = self.new_cid(ch);
858        let params = TransportParameters::new(
859            &config.transport,
860            &self.config,
861            self.local_cid_generator.as_ref(),
862            loc_cid,
863            None,
864            &mut self.rng,
865        );
866        let tls = config
867            .crypto
868            .start_session(config.version, server_name, &params)?;
869
870        let conn = self.add_connection(
871            ch,
872            config.version,
873            remote_id,
874            loc_cid,
875            remote_id,
876            FourTuple {
877                remote,
878                local_ip: None,
879            },
880            now,
881            tls,
882            config.transport,
883            SideArgs::Client {
884                token_store: config.token_store,
885                server_name: server_name.into(),
886            },
887        );
888        Ok((ch, conn))
889    }
890
891    fn send_new_identifiers(
892        &mut self,
893        now: Instant,
894        ch: ConnectionHandle,
895        num: u64,
896    ) -> ConnectionEvent {
897        let mut ids = vec![];
898        for _ in 0..num {
899            let id = self.new_cid(ch);
900            let meta = &mut self.connections[ch];
901            let sequence = meta.cids_issued;
902            meta.cids_issued += 1;
903            meta.loc_cids.insert(sequence, id);
904            ids.push(IssuedCid {
905                sequence,
906                id,
907                reset_token: ResetToken::new(&*self.config.reset_key, id),
908            });
909        }
910        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
911    }
912
913    /// Generate a connection ID for `ch`
914    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
915        loop {
916            let cid = self.local_cid_generator.generate_cid();
917            if cid.is_empty() {
918                // Zero-length CID; nothing to track
919                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
920                return cid;
921            }
922            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
923                e.insert(ch);
924                break cid;
925            }
926        }
927    }
928
929    fn handle_first_packet(
930        &mut self,
931        datagram_len: usize,
932        event: DatagramConnectionEvent,
933        addresses: FourTuple,
934        buf: &mut Vec<u8>,
935    ) -> Option<DatagramEvent> {
936        let dst_cid = event.first_decode.dst_cid();
937        let header = event.first_decode.initial_header().unwrap();
938
939        let Some(server_config) = &self.server_config else {
940            debug!("packet for unrecognized connection {}", dst_cid);
941            return self
942                .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
943                .map(DatagramEvent::Response);
944        };
945
946        if datagram_len < MIN_INITIAL_SIZE as usize {
947            debug!("ignoring short initial for connection {}", dst_cid);
948            return None;
949        }
950
951        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
952            Ok(keys) => keys,
953            Err(UnsupportedVersion) => {
954                // This probably indicates that the user set supported_versions incorrectly in
955                // `EndpointConfig`.
956                debug!(
957                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
958                    header.version
959                );
960                return None;
961            }
962        };
963
964        if let Err(reason) = self.early_validate_first_packet(header) {
965            return Some(DatagramEvent::Response(self.initial_close(
966                header.version,
967                addresses,
968                &crypto,
969                &header.src_cid,
970                reason,
971                buf,
972            )));
973        }
974
975        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
976            Ok(packet) => packet,
977            Err(e) => {
978                trace!("unable to decode initial packet: {}", e);
979                return None;
980            }
981        };
982
983        if !packet.reserved_bits_valid() {
984            debug!("dropping connection attempt with invalid reserved bits");
985            return None;
986        }
987
988        let Header::Initial(header) = packet.header else {
989            panic!("non-initial packet in handle_first_packet()");
990        };
991
992        let server_config = self.server_config.as_ref().unwrap().clone();
993
994        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
995            Ok(token) => token,
996            Err(InvalidRetryTokenError) => {
997                debug!("rejecting invalid retry token");
998                return Some(DatagramEvent::Response(self.initial_close(
999                    header.version,
1000                    addresses,
1001                    &crypto,
1002                    &header.src_cid,
1003                    TransportError::INVALID_TOKEN(""),
1004                    buf,
1005                )));
1006            }
1007        };
1008
1009        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1010        self.index
1011            .insert_initial_incoming(header.dst_cid, incoming_idx);
1012
1013        Some(DatagramEvent::NewConnection(Incoming {
1014            received_at: event.now,
1015            addresses,
1016            ecn: event.ecn,
1017            packet: InitialPacket {
1018                header,
1019                header_data: packet.header_data,
1020                payload: packet.payload,
1021            },
1022            rest: event.remaining,
1023            crypto,
1024            token,
1025            incoming_idx,
1026            improper_drop_warner: IncomingImproperDropWarner,
1027        }))
1028    }
1029
1030    /// Attempt to accept this incoming connection (an error may still occur)
1031    // AcceptError cannot be made smaller without semver breakage
1032    #[allow(clippy::result_large_err)]
1033    pub fn accept(
1034        &mut self,
1035        mut incoming: Incoming,
1036        now: Instant,
1037        buf: &mut Vec<u8>,
1038        server_config: Option<Arc<ServerConfig>>,
1039    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1040        let remote_address_validated = incoming.remote_address_validated();
1041        incoming.improper_drop_warner.dismiss();
1042        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1043        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1044
1045        let packet_number = incoming.packet.header.number.expand(0);
1046        let InitialHeader {
1047            src_cid,
1048            dst_cid,
1049            version,
1050            ..
1051        } = incoming.packet.header;
1052        let server_config =
1053            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1054
1055        if server_config
1056            .transport
1057            .max_idle_timeout
1058            .is_some_and(|timeout| {
1059                incoming.received_at + Duration::from_millis(timeout.into()) <= now
1060            })
1061        {
1062            debug!("abandoning accept of stale initial");
1063            self.index.remove_initial(dst_cid);
1064            return Err(AcceptError {
1065                cause: ConnectionError::TimedOut,
1066                response: None,
1067            });
1068        }
1069
1070        if self.cids_exhausted() {
1071            debug!("refusing connection");
1072            self.index.remove_initial(dst_cid);
1073            return Err(AcceptError {
1074                cause: ConnectionError::CidsExhausted,
1075                response: Some(self.initial_close(
1076                    version,
1077                    incoming.addresses,
1078                    &incoming.crypto,
1079                    &src_cid,
1080                    TransportError::CONNECTION_REFUSED(""),
1081                    buf,
1082                )),
1083            });
1084        }
1085
1086        if incoming
1087            .crypto
1088            .packet
1089            .remote
1090            .decrypt(
1091                packet_number,
1092                &incoming.packet.header_data,
1093                &mut incoming.packet.payload,
1094            )
1095            .is_err()
1096        {
1097            debug!(packet_number, "failed to authenticate initial packet");
1098            self.index.remove_initial(dst_cid);
1099            return Err(AcceptError {
1100                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1101                response: None,
1102            });
1103        };
1104
1105        let ch = ConnectionHandle(self.connections.vacant_key());
1106        let loc_cid = self.new_cid(ch);
1107        let mut params = TransportParameters::new(
1108            &server_config.transport,
1109            &self.config,
1110            self.local_cid_generator.as_ref(),
1111            loc_cid,
1112            Some(&server_config),
1113            &mut self.rng,
1114        );
1115        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1116        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1117        params.retry_src_cid = incoming.token.retry_src_cid;
1118        let mut pref_addr_cid = None;
1119        if server_config.has_preferred_address() {
1120            let cid = self.new_cid(ch);
1121            pref_addr_cid = Some(cid);
1122            params.preferred_address = Some(PreferredAddress {
1123                address_v4: server_config.preferred_address_v4,
1124                address_v6: server_config.preferred_address_v6,
1125                connection_id: cid,
1126                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1127            });
1128        }
1129
1130        let tls = server_config.crypto.clone().start_session(version, &params);
1131        let transport_config = server_config.transport.clone();
1132        let mut conn = self.add_connection(
1133            ch,
1134            version,
1135            dst_cid,
1136            loc_cid,
1137            src_cid,
1138            incoming.addresses,
1139            incoming.received_at,
1140            tls,
1141            transport_config,
1142            SideArgs::Server {
1143                server_config,
1144                pref_addr_cid,
1145                path_validated: remote_address_validated,
1146            },
1147        );
1148        self.index.insert_initial(dst_cid, ch);
1149
1150        match conn.handle_first_packet(
1151            incoming.received_at,
1152            incoming.addresses.remote,
1153            incoming.ecn,
1154            packet_number,
1155            incoming.packet,
1156            incoming.rest,
1157        ) {
1158            Ok(()) => {
1159                trace!(id = ch.0, icid = %dst_cid, "new connection");
1160
1161                for event in incoming_buffer.datagrams {
1162                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1163                }
1164
1165                Ok((ch, conn))
1166            }
1167            Err(e) => {
1168                debug!("handshake failed: {}", e);
1169                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1170                let response = match e {
1171                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
1172                        version,
1173                        incoming.addresses,
1174                        &incoming.crypto,
1175                        &src_cid,
1176                        e.clone(),
1177                        buf,
1178                    )),
1179                    _ => None,
1180                };
1181                Err(AcceptError { cause: e, response })
1182            }
1183        }
1184    }
1185
1186    /// Check if we should refuse a connection attempt regardless of the packet's contents
1187    fn early_validate_first_packet(
1188        &mut self,
1189        header: &ProtectedInitialHeader,
1190    ) -> Result<(), TransportError> {
1191        let config = &self.server_config.as_ref().unwrap();
1192        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1193            return Err(TransportError::CONNECTION_REFUSED(""));
1194        }
1195
1196        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
1197        // bytes. If this is a Retry packet, then the length must instead match our usual CID
1198        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
1199        // also need to validate CID length for those after decoding the token.
1200        if header.dst_cid.len() < 8
1201            && (header.token_pos.is_empty()
1202                || header.dst_cid.len() != self.local_cid_generator.cid_len())
1203        {
1204            debug!(
1205                "rejecting connection due to invalid DCID length {}",
1206                header.dst_cid.len()
1207            );
1208            return Err(TransportError::PROTOCOL_VIOLATION(
1209                "invalid destination CID length",
1210            ));
1211        }
1212
1213        Ok(())
1214    }
1215
1216    /// Reject this incoming connection attempt
1217    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1218        self.clean_up_incoming(&incoming);
1219        incoming.improper_drop_warner.dismiss();
1220
1221        self.initial_close(
1222            incoming.packet.header.version,
1223            incoming.addresses,
1224            &incoming.crypto,
1225            &incoming.packet.header.src_cid,
1226            TransportError::CONNECTION_REFUSED(""),
1227            buf,
1228        )
1229    }
1230
1231    /// Respond with a retry packet, requiring the client to retry with address validation
1232    ///
1233    /// Errors if `incoming.may_retry()` is false.
1234    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1235        if !incoming.may_retry() {
1236            return Err(RetryError(Box::new(incoming)));
1237        }
1238
1239        self.clean_up_incoming(&incoming);
1240        incoming.improper_drop_warner.dismiss();
1241
1242        let server_config = self.server_config.as_ref().unwrap();
1243
1244        // First Initial
1245        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
1246        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
1247        // with established connections. In the unlikely event that a collision occurs
1248        // between two connections in the initial phase, both will fail fast and may be
1249        // retried by the application layer.
1250        let loc_cid = self.local_cid_generator.generate_cid();
1251
1252        let payload = TokenPayload::Retry {
1253            address: incoming.addresses.remote,
1254            orig_dst_cid: incoming.packet.header.dst_cid,
1255            issued: server_config.time_source.now(),
1256        };
1257        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1258
1259        let header = Header::Retry {
1260            src_cid: loc_cid,
1261            dst_cid: incoming.packet.header.src_cid,
1262            version: incoming.packet.header.version,
1263        };
1264
1265        let encode = header.encode(buf);
1266        buf.put_slice(&token);
1267        buf.extend_from_slice(&server_config.crypto.retry_tag(
1268            incoming.packet.header.version,
1269            &incoming.packet.header.dst_cid,
1270            buf,
1271        ));
1272        encode.finish(buf, &*incoming.crypto.header.local, None);
1273
1274        Ok(Transmit {
1275            destination: incoming.addresses.remote,
1276            ecn: None,
1277            size: buf.len(),
1278            segment_size: None,
1279            src_ip: incoming.addresses.local_ip,
1280        })
1281    }
1282
1283    /// Ignore this incoming connection attempt, not sending any packet in response
1284    ///
1285    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
1286    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
1287    pub fn ignore(&mut self, incoming: Incoming) {
1288        self.clean_up_incoming(&incoming);
1289        incoming.improper_drop_warner.dismiss();
1290    }
1291
1292    /// Clean up endpoint data structures associated with an `Incoming`.
1293    fn clean_up_incoming(&mut self, incoming: &Incoming) {
1294        self.index.remove_initial(incoming.packet.header.dst_cid);
1295        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1296        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1297    }
1298
1299    fn add_connection(
1300        &mut self,
1301        ch: ConnectionHandle,
1302        version: u32,
1303        init_cid: ConnectionId,
1304        loc_cid: ConnectionId,
1305        rem_cid: ConnectionId,
1306        addresses: FourTuple,
1307        now: Instant,
1308        tls: Box<dyn crypto::Session>,
1309        transport_config: Arc<TransportConfig>,
1310        side_args: SideArgs,
1311    ) -> Connection {
1312        let mut rng_seed = [0; 32];
1313        self.rng.fill_bytes(&mut rng_seed);
1314        let side = side_args.side();
1315        let pref_addr_cid = side_args.pref_addr_cid();
1316        let conn = Connection::new(
1317            self.config.clone(),
1318            transport_config,
1319            init_cid,
1320            loc_cid,
1321            rem_cid,
1322            addresses.remote,
1323            addresses.local_ip,
1324            tls,
1325            self.local_cid_generator.as_ref(),
1326            now,
1327            version,
1328            self.allow_mtud,
1329            rng_seed,
1330            side_args,
1331        );
1332
1333        let mut cids_issued = 0;
1334        let mut loc_cids = FxHashMap::default();
1335
1336        loc_cids.insert(cids_issued, loc_cid);
1337        cids_issued += 1;
1338
1339        if let Some(cid) = pref_addr_cid {
1340            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1341            loc_cids.insert(cids_issued, cid);
1342            cids_issued += 1;
1343        }
1344
1345        let id = self.connections.insert(ConnectionMeta {
1346            init_cid,
1347            cids_issued,
1348            loc_cids,
1349            addresses,
1350            side,
1351            reset_token: None,
1352            peer_id: None,
1353        });
1354        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1355
1356        self.index.insert_conn(addresses, loc_cid, ch, side);
1357
1358        conn
1359    }
1360
1361    fn initial_close(
1362        &mut self,
1363        version: u32,
1364        addresses: FourTuple,
1365        crypto: &Keys,
1366        remote_id: &ConnectionId,
1367        reason: TransportError,
1368        buf: &mut Vec<u8>,
1369    ) -> Transmit {
1370        // We don't need to worry about CID collisions in initial closes because the peer
1371        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
1372        // unexpected response.
1373        let local_id = self.local_cid_generator.generate_cid();
1374        let number = PacketNumber::U8(0);
1375        let header = Header::Initial(InitialHeader {
1376            dst_cid: *remote_id,
1377            src_cid: local_id,
1378            number,
1379            token: Bytes::new(),
1380            version,
1381        });
1382
1383        let partial_encode = header.encode(buf);
1384        let max_len =
1385            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1386        frame::Close::from(reason).encode(buf, max_len);
1387        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1388        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1389        Transmit {
1390            destination: addresses.remote,
1391            ecn: None,
1392            size: buf.len(),
1393            segment_size: None,
1394            src_ip: addresses.local_ip,
1395        }
1396    }
1397
1398    /// Access the configuration used by this endpoint
1399    pub fn config(&self) -> &EndpointConfig {
1400        &self.config
1401    }
1402
1403    /// Enable or disable address discovery for this endpoint
1404    ///
1405    /// Address discovery is enabled by default. When enabled, the endpoint will:
1406    /// - Send OBSERVED_ADDRESS frames to peers to inform them of their reflexive addresses
1407    /// - Process received OBSERVED_ADDRESS frames to learn about its own reflexive addresses
1408    /// - Integrate discovered addresses with NAT traversal for improved connectivity
1409    pub fn enable_address_discovery(&mut self, enabled: bool) {
1410        self.address_discovery_enabled = enabled;
1411        // Note: Existing connections will continue with their current setting.
1412        // New connections will use the updated setting.
1413    }
1414
1415    /// Check if address discovery is enabled
1416    pub fn address_discovery_enabled(&self) -> bool {
1417        self.address_discovery_enabled
1418    }
1419
1420    /// Get all discovered addresses across all connections
1421    ///
1422    /// Returns a list of unique socket addresses that have been observed
1423    /// by remote peers and reported via OBSERVED_ADDRESS frames.
1424    ///
1425    /// Note: This returns an empty vector in the current implementation.
1426    /// Applications should track discovered addresses at the connection level.
1427    pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1428        // TODO: Implement address tracking at the endpoint level
1429        Vec::new()
1430    }
1431
1432    /// Set a callback to be invoked when an address change is detected
1433    ///
1434    /// The callback receives the old address (if any) and the new address.
1435    /// Only one callback can be set at a time; setting a new callback replaces the previous one.
1436    pub fn set_address_change_callback<F>(&mut self, callback: F)
1437    where
1438        F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1439    {
1440        self.address_change_callback = Some(Box::new(callback));
1441    }
1442
1443    /// Clear the address change callback
1444    pub fn clear_address_change_callback(&mut self) {
1445        self.address_change_callback = None;
1446    }
1447
1448    /// Get address discovery statistics
1449    ///
1450    /// Note: This returns default statistics in the current implementation.
1451    /// Applications should track statistics at the connection level.
1452    pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1453        // TODO: Implement statistics tracking at the endpoint level
1454        AddressDiscoveryStats::default()
1455    }
1456
1457    /// Number of connections that are currently open
1458    pub fn open_connections(&self) -> usize {
1459        self.connections.len()
1460    }
1461
1462    /// Counter for the number of bytes currently used
1463    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
1464    pub fn incoming_buffer_bytes(&self) -> u64 {
1465        self.all_incoming_buffers_total_bytes
1466    }
1467
1468    #[cfg(test)]
1469    pub(crate) fn known_connections(&self) -> usize {
1470        let x = self.connections.len();
1471        debug_assert_eq!(x, self.index.connection_ids_initial.len());
1472        // Not all connections have known reset tokens
1473        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1474        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
1475        debug_assert!(x >= self.index.incoming_connection_remotes.len());
1476        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1477        x
1478    }
1479
1480    #[cfg(test)]
1481    pub(crate) fn known_cids(&self) -> usize {
1482        self.index.connection_ids.len()
1483    }
1484
1485    /// Whether we've used up 3/4 of the available CID space
1486    ///
1487    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
1488    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
1489    fn cids_exhausted(&self) -> bool {
1490        self.local_cid_generator.cid_len() <= 4
1491            && self.local_cid_generator.cid_len() != 0
1492            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1493                - self.index.connection_ids.len())
1494                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1495    }
1496}
1497
1498impl fmt::Debug for Endpoint {
1499    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1500        fmt.debug_struct("Endpoint")
1501            .field("rng", &self.rng)
1502            .field("index", &self.index)
1503            .field("connections", &self.connections)
1504            .field("config", &self.config)
1505            .field("server_config", &self.server_config)
1506            // incoming_buffers too large
1507            .field("incoming_buffers.len", &self.incoming_buffers.len())
1508            .field(
1509                "all_incoming_buffers_total_bytes",
1510                &self.all_incoming_buffers_total_bytes,
1511            )
1512            .finish()
1513    }
1514}
1515
1516/// Buffered Initial and 0-RTT messages for a pending incoming connection
1517#[derive(Default)]
1518struct IncomingBuffer {
1519    datagrams: Vec<DatagramConnectionEvent>,
1520    total_bytes: u64,
1521}
1522
1523/// Part of protocol state incoming datagrams can be routed to
1524#[derive(Copy, Clone, Debug)]
1525enum RouteDatagramTo {
1526    Incoming(usize),
1527    Connection(ConnectionHandle),
1528}
1529
1530/// Maps packets to existing connections
1531#[derive(Default, Debug)]
1532struct ConnectionIndex {
1533    /// Identifies connections based on the initial DCID the peer utilized
1534    ///
1535    /// Uses a standard `HashMap` to protect against hash collision attacks.
1536    ///
1537    /// Used by the server, not the client.
1538    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1539    /// Identifies connections based on locally created CIDs
1540    ///
1541    /// Uses a cheaper hash function since keys are locally created
1542    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1543    /// Identifies incoming connections with zero-length CIDs
1544    ///
1545    /// Uses a standard `HashMap` to protect against hash collision attacks.
1546    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1547    /// Identifies outgoing connections with zero-length CIDs
1548    ///
1549    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1550    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1551    /// may be established per remote. We must omit the local address from the key because we don't
1552    /// necessarily know what address we're sending from, and hence receiving at.
1553    ///
1554    /// Uses a standard `HashMap` to protect against hash collision attacks.
1555    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1556    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1557    ///
1558    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1559    /// recipient, if any.
1560    connection_reset_tokens: ResetTokenTable,
1561}
1562
1563impl ConnectionIndex {
1564    /// Associate an incoming connection with its initial destination CID
1565    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1566        if dst_cid.is_empty() {
1567            return;
1568        }
1569        self.connection_ids_initial
1570            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1571    }
1572
1573    /// Remove an association with an initial destination CID
1574    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1575        if dst_cid.is_empty() {
1576            return;
1577        }
1578        let removed = self.connection_ids_initial.remove(&dst_cid);
1579        debug_assert!(removed.is_some());
1580    }
1581
1582    /// Associate a connection with its initial destination CID
1583    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1584        if dst_cid.is_empty() {
1585            return;
1586        }
1587        self.connection_ids_initial
1588            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1589    }
1590
1591    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1592    /// its current 4-tuple
1593    fn insert_conn(
1594        &mut self,
1595        addresses: FourTuple,
1596        dst_cid: ConnectionId,
1597        connection: ConnectionHandle,
1598        side: Side,
1599    ) {
1600        match dst_cid.len() {
1601            0 => match side {
1602                Side::Server => {
1603                    self.incoming_connection_remotes
1604                        .insert(addresses, connection);
1605                }
1606                Side::Client => {
1607                    self.outgoing_connection_remotes
1608                        .insert(addresses.remote, connection);
1609                }
1610            },
1611            _ => {
1612                self.connection_ids.insert(dst_cid, connection);
1613            }
1614        }
1615    }
1616
1617    /// Discard a connection ID
1618    fn retire(&mut self, dst_cid: ConnectionId) {
1619        self.connection_ids.remove(&dst_cid);
1620    }
1621
1622    /// Remove all references to a connection
1623    fn remove(&mut self, conn: &ConnectionMeta) {
1624        if conn.side.is_server() {
1625            self.remove_initial(conn.init_cid);
1626        }
1627        for cid in conn.loc_cids.values() {
1628            self.connection_ids.remove(cid);
1629        }
1630        self.incoming_connection_remotes.remove(&conn.addresses);
1631        self.outgoing_connection_remotes
1632            .remove(&conn.addresses.remote);
1633        if let Some((remote, token)) = conn.reset_token {
1634            self.connection_reset_tokens.remove(remote, token);
1635        }
1636    }
1637
1638    /// Find the existing connection that `datagram` should be routed to, if any
1639    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1640        let dst_cid = datagram.dst_cid();
1641        let is_empty_cid = dst_cid.is_empty();
1642
1643        // Fast path: Try most common lookup first (non-empty CID)
1644        if !is_empty_cid {
1645            if let Some(&ch) = self.connection_ids.get(dst_cid) {
1646                return Some(RouteDatagramTo::Connection(ch));
1647            }
1648        }
1649
1650        // Initial/0RTT packet lookup
1651        if datagram.is_initial() || datagram.is_0rtt() {
1652            if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1653                return Some(ch);
1654            }
1655        }
1656
1657        // Empty CID lookup (less common, do after fast path)
1658        if is_empty_cid {
1659            // Check incoming connections first (servers handle more incoming)
1660            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1661                return Some(RouteDatagramTo::Connection(ch));
1662            }
1663            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1664                return Some(RouteDatagramTo::Connection(ch));
1665            }
1666        }
1667
1668        // Stateless reset token lookup (least common, do last)
1669        let data = datagram.data();
1670        if data.len() < RESET_TOKEN_SIZE {
1671            return None;
1672        }
1673        self.connection_reset_tokens
1674            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1675            .cloned()
1676            .map(RouteDatagramTo::Connection)
1677    }
1678}
1679
1680#[derive(Debug)]
1681pub(crate) struct ConnectionMeta {
1682    init_cid: ConnectionId,
1683    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1684    cids_issued: u64,
1685    loc_cids: FxHashMap<u64, ConnectionId>,
1686    /// Remote/local addresses the connection began with
1687    ///
1688    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1689    /// bother keeping it up to date.
1690    addresses: FourTuple,
1691    side: Side,
1692    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1693    /// being sent to
1694    reset_token: Option<(SocketAddr, ResetToken)>,
1695    /// Peer ID for this connection, used for relay functionality
1696    peer_id: Option<PeerId>,
1697}
1698
1699/// Internal identifier for a `Connection` currently associated with an endpoint
1700#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1701pub struct ConnectionHandle(pub usize);
1702
1703impl From<ConnectionHandle> for usize {
1704    fn from(x: ConnectionHandle) -> Self {
1705        x.0
1706    }
1707}
1708
1709impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1710    type Output = ConnectionMeta;
1711    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1712        &self[ch.0]
1713    }
1714}
1715
1716impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1717    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1718        &mut self[ch.0]
1719    }
1720}
1721
1722/// Event resulting from processing a single datagram
1723pub enum DatagramEvent {
1724    /// The datagram is redirected to its `Connection`
1725    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1726    /// The datagram may result in starting a new `Connection`
1727    NewConnection(Incoming),
1728    /// Response generated directly by the endpoint
1729    Response(Transmit),
1730}
1731
1732/// An incoming connection for which the server has not yet begun its part of the handshake.
1733pub struct Incoming {
1734    received_at: Instant,
1735    addresses: FourTuple,
1736    ecn: Option<EcnCodepoint>,
1737    packet: InitialPacket,
1738    rest: Option<BytesMut>,
1739    crypto: Keys,
1740    token: IncomingToken,
1741    incoming_idx: usize,
1742    improper_drop_warner: IncomingImproperDropWarner,
1743}
1744
1745impl Incoming {
1746    /// The local IP address which was used when the peer established the connection
1747    ///
1748    /// This has the same behavior as [`Connection::local_ip`].
1749    pub fn local_ip(&self) -> Option<IpAddr> {
1750        self.addresses.local_ip
1751    }
1752
1753    /// The peer's UDP address
1754    pub fn remote_address(&self) -> SocketAddr {
1755        self.addresses.remote
1756    }
1757
1758    /// Whether the socket address that is initiating this connection has been validated
1759    ///
1760    /// This means that the sender of the initial packet has proved that they can receive traffic
1761    /// sent to `self.remote_address()`.
1762    ///
1763    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1764    /// The inverse is not guaranteed.
1765    pub fn remote_address_validated(&self) -> bool {
1766        self.token.validated
1767    }
1768
1769    /// Whether it is legal to respond with a retry packet
1770    ///
1771    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1772    /// The inverse is not guaranteed.
1773    pub fn may_retry(&self) -> bool {
1774        self.token.retry_src_cid.is_none()
1775    }
1776
1777    /// The original destination connection ID sent by the client
1778    pub fn orig_dst_cid(&self) -> &ConnectionId {
1779        &self.token.orig_dst_cid
1780    }
1781}
1782
1783impl fmt::Debug for Incoming {
1784    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1785        f.debug_struct("Incoming")
1786            .field("addresses", &self.addresses)
1787            .field("ecn", &self.ecn)
1788            // packet doesn't implement debug
1789            // rest is too big and not meaningful enough
1790            .field("token", &self.token)
1791            .field("incoming_idx", &self.incoming_idx)
1792            // improper drop warner contains no information
1793            .finish_non_exhaustive()
1794    }
1795}
1796
1797struct IncomingImproperDropWarner;
1798
1799impl IncomingImproperDropWarner {
1800    fn dismiss(self) {
1801        mem::forget(self);
1802    }
1803}
1804
1805impl Drop for IncomingImproperDropWarner {
1806    fn drop(&mut self) {
1807        warn!(
1808            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1809               (may cause memory leak and eventual inability to accept new connections)"
1810        );
1811    }
1812}
1813
1814/// Errors in the parameters being used to create a new connection
1815///
1816/// These arise before any I/O has been performed.
1817#[derive(Debug, Error, Clone, PartialEq, Eq)]
1818pub enum ConnectError {
1819    /// The endpoint can no longer create new connections
1820    ///
1821    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1822    #[error("endpoint stopping")]
1823    EndpointStopping,
1824    /// The connection could not be created because not enough of the CID space is available
1825    ///
1826    /// Try using longer connection IDs
1827    #[error("CIDs exhausted")]
1828    CidsExhausted,
1829    /// The given server name was malformed
1830    #[error("invalid server name: {0}")]
1831    InvalidServerName(String),
1832    /// The remote [`SocketAddr`] supplied was malformed
1833    ///
1834    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1835    #[error("invalid remote address: {0}")]
1836    InvalidRemoteAddress(SocketAddr),
1837    /// No default client configuration was set up
1838    ///
1839    /// Use `Endpoint::connect_with` to specify a client configuration.
1840    #[error("no default client config")]
1841    NoDefaultClientConfig,
1842    /// The local endpoint does not support the QUIC version specified in the client configuration
1843    #[error("unsupported QUIC version")]
1844    UnsupportedVersion,
1845}
1846
1847/// Error type for attempting to accept an [`Incoming`]
1848#[derive(Debug)]
1849pub struct AcceptError {
1850    /// Underlying error describing reason for failure
1851    pub cause: ConnectionError,
1852    /// Optional response to transmit back
1853    pub response: Option<Transmit>,
1854}
1855
1856/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1857#[derive(Debug, Error)]
1858#[error("retry() with validated Incoming")]
1859pub struct RetryError(Box<Incoming>);
1860
1861impl RetryError {
1862    /// Get the [`Incoming`]
1863    pub fn into_incoming(self) -> Incoming {
1864        *self.0
1865    }
1866}
1867
1868/// Reset Tokens which are associated with peer socket addresses
1869///
1870/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1871/// peer generated and might be usable for hash collision attacks.
1872#[derive(Default, Debug)]
1873struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1874
1875impl ResetTokenTable {
1876    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1877        self.0
1878            .entry(remote)
1879            .or_default()
1880            .insert(token, ch)
1881            .is_some()
1882    }
1883
1884    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1885        use std::collections::hash_map::Entry;
1886        match self.0.entry(remote) {
1887            Entry::Vacant(_) => {}
1888            Entry::Occupied(mut e) => {
1889                e.get_mut().remove(&token);
1890                if e.get().is_empty() {
1891                    e.remove_entry();
1892                }
1893            }
1894        }
1895    }
1896
1897    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1898        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1899        self.0.get(&remote)?.get(&token)
1900    }
1901}
1902
1903/// Identifies a connection by the combination of remote and local addresses
1904///
1905/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1906/// subnet and zero-length connection IDs are in use.
1907#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1908struct FourTuple {
1909    remote: SocketAddr,
1910    // A single socket can only listen on a single port, so no need to store it explicitly
1911    local_ip: Option<IpAddr>,
1912}