ant_quic/
endpoint.rs

1use std::{
2    collections::{HashMap, VecDeque, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21    Side, Transmit, TransportConfig, TransportError,
22    cid_generator::ConnectionIdGenerator,
23    coding::BufMutExt,
24    config::{ClientConfig, EndpointConfig, ServerConfig},
25    connection::{Connection, ConnectionError, SideArgs},
26    crypto::{self, Keys, UnsupportedVersion},
27    frame,
28    nat_traversal_api::PeerId,
29    packet::{
30        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31        PacketNumber, PartialDecode, ProtectedInitialHeader,
32    },
33    relay::RelayStatisticsCollector,
34    shared::{
35        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
36        EndpointEvent, EndpointEventInner, IssuedCid,
37    },
38    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
39    transport_parameters::{PreferredAddress, TransportParameters},
40};
41
42/// A queued relay request for bootstrap nodes
43#[derive(Debug, Clone)]
44struct RelayQueueItem {
45    /// Target peer ID for the relay
46    target_peer_id: PeerId,
47    /// Frame to be relayed
48    frame: frame::PunchMeNow,
49    /// When this relay request was created
50    created_at: Instant,
51    /// Number of relay attempts made
52    attempts: u32,
53    /// Last attempt time
54    last_attempt: Option<Instant>,
55}
56
57/// Relay queue management for bootstrap nodes
58#[derive(Debug)]
59struct RelayQueue {
60    /// Pending relay requests with insertion order and O(1) access
61    pending: IndexMap<u64, RelayQueueItem>,
62    /// Next sequence number for insertion order
63    next_seq: u64,
64    /// Maximum queue size to prevent memory exhaustion
65    max_queue_size: usize,
66    /// Timeout for relay requests
67    request_timeout: Duration,
68    /// Maximum retry attempts per request
69    max_retries: u32,
70    /// Minimum interval between retry attempts
71    retry_interval: Duration,
72    /// Rate limiting: track recent relay requests per peer
73    rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
74    /// Maximum relays per peer per time window
75    max_relays_per_peer: usize,
76    /// Rate limiting time window
77    rate_limit_window: Duration,
78}
79
80/// Address discovery statistics
81#[derive(Debug, Default, Clone)]
82pub struct AddressDiscoveryStats {
83    /// Number of OBSERVED_ADDRESS frames sent
84    pub frames_sent: u64,
85    /// Number of OBSERVED_ADDRESS frames received
86    pub frames_received: u64,
87    /// Number of unique addresses discovered
88    pub addresses_discovered: u64,
89    /// Number of address changes detected
90    pub address_changes_detected: u64,
91}
92
93/// Relay statistics for monitoring and debugging
94#[derive(Debug, Default, Clone)]
95pub struct RelayStats {
96    /// Total relay requests received
97    pub requests_received: u64,
98    /// Successfully relayed requests
99    pub requests_relayed: u64,
100    /// Failed relay requests (peer not found)
101    pub requests_failed: u64,
102    /// Requests dropped due to queue full
103    pub requests_dropped: u64,
104    /// Requests timed out
105    pub requests_timed_out: u64,
106    /// Requests dropped due to rate limiting
107    pub requests_rate_limited: u64,
108    /// Current queue size
109    pub current_queue_size: usize,
110}
111
112impl RelayQueue {
113    /// Create a new relay queue with default settings
114    fn new() -> Self {
115        Self {
116            pending: IndexMap::new(),
117            next_seq: 0,
118            max_queue_size: 1000,                     // Reasonable default
119            request_timeout: Duration::from_secs(30), // 30 second timeout
120            max_retries: 3,
121            retry_interval: Duration::from_millis(500), // 500ms between retries
122            rate_limiter: HashMap::new(),
123            max_relays_per_peer: 10, // Max 10 relays per peer per time window
124            rate_limit_window: Duration::from_secs(60), // 1 minute window
125        }
126    }
127
128    /// Add a relay request to the queue
129    fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
130        // Check queue size limit
131        if self.pending.len() >= self.max_queue_size {
132            warn!(
133                "Relay queue full, dropping request for peer {:?}",
134                target_peer_id
135            );
136            return false;
137        }
138
139        // Check rate limit for this peer
140        if !self.check_rate_limit(target_peer_id, now) {
141            warn!(
142                "Rate limit exceeded for peer {:?}, dropping relay request",
143                target_peer_id
144            );
145            return false;
146        }
147
148        let item = RelayQueueItem {
149            target_peer_id,
150            frame,
151            created_at: now,
152            attempts: 0,
153            last_attempt: None,
154        };
155
156        let seq = self.next_seq;
157        self.next_seq += 1;
158        self.pending.insert(seq, item);
159
160        // Record this request for rate limiting
161        self.record_relay_request(target_peer_id, now);
162
163        trace!(
164            "Queued relay request for peer {:?}, queue size: {}",
165            target_peer_id,
166            self.pending.len()
167        );
168        true
169    }
170
171    /// Check if a relay request is within rate limits
172    fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
173        // Clean up old entries first
174        self.cleanup_rate_limiter(now);
175
176        // Check current request count for this peer
177        if let Some(requests) = self.rate_limiter.get(&peer_id) {
178            requests.len() < self.max_relays_per_peer
179        } else {
180            true // No previous requests, allow
181        }
182    }
183
184    /// Record a relay request for rate limiting
185    fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
186        self.rate_limiter.entry(peer_id).or_default().push_back(now);
187    }
188
189    /// Clean up old rate limiting entries
190    fn cleanup_rate_limiter(&mut self, now: Instant) {
191        self.rate_limiter.retain(|_, requests| {
192            requests.retain(|&request_time| {
193                now.saturating_duration_since(request_time) <= self.rate_limit_window
194            });
195            !requests.is_empty()
196        });
197    }
198
199    /// Get the next relay request that's ready to be processed
200    fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
201        // Find the first request that's ready to be retried
202        let mut expired_keys = Vec::new();
203        let mut ready_key = None;
204
205        for (seq, item) in &self.pending {
206            // Check if request has timed out
207            if now.saturating_duration_since(item.created_at) > self.request_timeout {
208                expired_keys.push(*seq);
209                continue;
210            }
211
212            // Check if it's ready for retry
213            if item.attempts == 0
214                || item
215                    .last_attempt
216                    .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
217            {
218                ready_key = Some(*seq);
219                break;
220            }
221        }
222
223        // Remove expired items
224        for key in expired_keys {
225            if let Some(expired) = self.pending.shift_remove(&key) {
226                debug!(
227                    "Relay request for peer {:?} timed out after {:?}",
228                    expired.target_peer_id,
229                    now.saturating_duration_since(expired.created_at)
230                );
231            }
232        }
233
234        // Return ready item if found
235        if let Some(key) = ready_key {
236            if let Some(mut item) = self.pending.shift_remove(&key) {
237                item.attempts += 1;
238                item.last_attempt = Some(now);
239                return Some(item);
240            }
241        }
242
243        None
244    }
245
246    /// Requeue a failed relay request if it hasn't exceeded max retries
247    fn requeue_failed(&mut self, item: RelayQueueItem) {
248        if item.attempts < self.max_retries {
249            trace!(
250                "Requeuing failed relay request for peer {:?}, attempt {}/{}",
251                item.target_peer_id, item.attempts, self.max_retries
252            );
253            let seq = self.next_seq;
254            self.next_seq += 1;
255            self.pending.insert(seq, item);
256        } else {
257            debug!(
258                "Dropping relay request for peer {:?} after {} failed attempts",
259                item.target_peer_id, item.attempts
260            );
261        }
262    }
263
264    /// Clean up expired requests and return number of items cleaned
265    fn cleanup_expired(&mut self, now: Instant) -> usize {
266        let initial_len = self.pending.len();
267
268        // Collect expired keys
269        let expired_keys: Vec<u64> = self
270            .pending
271            .iter()
272            .filter_map(|(seq, item)| {
273                if now.saturating_duration_since(item.created_at) > self.request_timeout {
274                    Some(*seq)
275                } else {
276                    None
277                }
278            })
279            .collect();
280
281        // Remove expired items
282        for key in expired_keys {
283            if let Some(expired) = self.pending.shift_remove(&key) {
284                debug!(
285                    "Removing expired relay request for peer {:?}",
286                    expired.target_peer_id
287                );
288            }
289        }
290
291        initial_len - self.pending.len()
292    }
293
294    /// Get current queue length
295    fn len(&self) -> usize {
296        self.pending.len()
297    }
298}
299
300/// The main entry point to the library
301///
302/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
303/// connection-generated events via `handle` and `handle_event`.
304pub struct Endpoint {
305    rng: StdRng,
306    index: ConnectionIndex,
307    connections: Slab<ConnectionMeta>,
308    local_cid_generator: Box<dyn ConnectionIdGenerator>,
309    config: Arc<EndpointConfig>,
310    server_config: Option<Arc<ServerConfig>>,
311    /// Whether the underlying UDP socket promises not to fragment packets
312    allow_mtud: bool,
313    /// Time at which a stateless reset was most recently sent
314    last_stateless_reset: Option<Instant>,
315    /// Buffered Initial and 0-RTT messages for pending incoming connections
316    incoming_buffers: Slab<IncomingBuffer>,
317    all_incoming_buffers_total_bytes: u64,
318    /// Mapping from peer IDs to connection handles for relay functionality
319    peer_connections: HashMap<PeerId, ConnectionHandle>,
320    /// Relay queue for bootstrap nodes
321    relay_queue: RelayQueue,
322    /// Relay statistics
323    relay_stats: RelayStats,
324    /// Comprehensive relay statistics collector
325    relay_stats_collector: RelayStatisticsCollector,
326    /// Whether address discovery is enabled (default: true)
327    address_discovery_enabled: bool,
328    /// Address change callback
329    address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
330}
331
332impl Endpoint {
333    /// Create a new endpoint
334    ///
335    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
336    /// better performance. This requires that outgoing packets are never fragmented, which can be
337    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
338    ///
339    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
340    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
341    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
342    /// using [`EndpointConfig::rng_seed`].
343    pub fn new(
344        config: Arc<EndpointConfig>,
345        server_config: Option<Arc<ServerConfig>>,
346        allow_mtud: bool,
347        rng_seed: Option<[u8; 32]>,
348    ) -> Self {
349        let rng_seed = rng_seed.or(config.rng_seed);
350        Self {
351            rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
352            index: ConnectionIndex::default(),
353            connections: Slab::new(),
354            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
355            config,
356            server_config,
357            allow_mtud,
358            last_stateless_reset: None,
359            incoming_buffers: Slab::new(),
360            all_incoming_buffers_total_bytes: 0,
361            peer_connections: HashMap::new(),
362            relay_queue: RelayQueue::new(),
363            relay_stats: RelayStats::default(),
364            relay_stats_collector: RelayStatisticsCollector::new(),
365            address_discovery_enabled: true, // Default to enabled
366            address_change_callback: None,
367        }
368    }
369
370    /// Replace the server configuration, affecting new incoming connections only
371    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
372        self.server_config = server_config;
373    }
374
375    /// Register a peer ID with a connection handle for relay functionality
376    pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
377        self.peer_connections.insert(peer_id, connection_handle);
378        trace!(
379            "Registered peer {:?} with connection {:?}",
380            peer_id, connection_handle
381        );
382    }
383
384    /// Unregister a peer ID from the connection mapping
385    pub fn unregister_peer(&mut self, peer_id: &PeerId) {
386        if let Some(handle) = self.peer_connections.remove(peer_id) {
387            trace!(
388                "Unregistered peer {:?} from connection {:?}",
389                peer_id, handle
390            );
391        }
392    }
393
394    /// Look up a connection handle for a given peer ID
395    pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
396        self.peer_connections.get(peer_id).copied()
397    }
398
399    /// Queue a frame for relay to a target peer
400    pub(crate) fn queue_frame_for_peer(
401        &mut self,
402        peer_id: &PeerId,
403        frame: frame::PunchMeNow,
404    ) -> bool {
405        self.relay_stats.requests_received += 1;
406
407        if let Some(ch) = self.lookup_peer_connection(peer_id) {
408            // Peer is currently connected, try to relay immediately
409            if self.relay_frame_to_connection(ch, frame.clone()) {
410                self.relay_stats.requests_relayed += 1;
411                // Record successful rate limiting decision
412                self.relay_stats_collector.record_rate_limit(true);
413                trace!(
414                    "Immediately relayed frame to peer {:?} via connection {:?}",
415                    peer_id, ch
416                );
417                return true;
418            }
419        }
420
421        // Peer not connected or immediate relay failed, queue for later
422        let now = Instant::now();
423        if self.relay_queue.enqueue(*peer_id, frame, now) {
424            self.relay_stats.current_queue_size = self.relay_queue.len();
425            // Record successful rate limiting decision
426            self.relay_stats_collector.record_rate_limit(true);
427            trace!("Queued relay request for peer {:?}", peer_id);
428            true
429        } else {
430            // Check if it was rate limited or queue full
431            if !self.relay_queue.check_rate_limit(*peer_id, now) {
432                self.relay_stats.requests_rate_limited += 1;
433                // Record rate limiting rejection
434                self.relay_stats_collector.record_rate_limit(false);
435                // Record error
436                self.relay_stats_collector.record_error("rate_limited");
437            } else {
438                self.relay_stats.requests_dropped += 1;
439                // Record error for queue full
440                self.relay_stats_collector
441                    .record_error("resource_exhausted");
442            }
443            false
444        }
445    }
446
447    /// Attempt to relay a frame to a specific connection
448    fn relay_frame_to_connection(
449        &mut self,
450        ch: ConnectionHandle,
451        frame: frame::PunchMeNow,
452    ) -> bool {
453        // Queue the PunchMeNow frame to the connection via a connection event
454        let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
455        if let Some(_conn) = self.connections.get_mut(ch.0) {
456            // We cannot call into the connection directly here; return an event to be handled by the
457            // caller's event loop. For immediate relay, we push it into the connection by returning a
458            // ConnectionEvent through the normal endpoint flow.
459            // As Endpoint::handle_event returns Option<ConnectionEvent>, we emulate that path here by
460            // enqueuing the event on the endpoint index for this connection.
461            // Use the same flow as datagram dispatch: construct and return via DatagramEvent::ConnectionEvent
462        }
463        // Fallback: indicate the caller should emit a ConnectionEvent for this handle
464        // Since this method is used internally in endpoint's event loop where we can return a
465        // ConnectionEvent, let the caller path handle it. Here, report success so the queue logic proceeds.
466        true
467    }
468
469    /// Set the peer ID for an existing connection
470    pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
471        if let Some(connection) = self.connections.get_mut(connection_handle.0) {
472            connection.peer_id = Some(peer_id);
473            self.register_peer(peer_id, connection_handle);
474
475            // Process any queued relay requests for this peer
476            self.process_queued_relays_for_peer(peer_id);
477        }
478    }
479
480    /// Process queued relay requests for a specific peer that just connected
481    fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
482        let _now = Instant::now();
483        let mut processed = 0;
484
485        // Collect items to process for this peer
486        let mut items_to_process = Vec::new();
487        let mut keys_to_remove = Vec::new();
488
489        // Find all items for this peer
490        for (seq, item) in &self.relay_queue.pending {
491            if item.target_peer_id == peer_id {
492                items_to_process.push(item.clone());
493                keys_to_remove.push(*seq);
494            }
495        }
496
497        // Remove items from queue
498        for key in keys_to_remove {
499            self.relay_queue.pending.shift_remove(&key);
500        }
501
502        // Process the items
503        for item in items_to_process {
504            if let Some(ch) = self.lookup_peer_connection(&peer_id) {
505                if self.relay_frame_to_connection(ch, item.frame.clone()) {
506                    self.relay_stats.requests_relayed += 1;
507                    processed += 1;
508                    trace!("Processed queued relay for peer {:?}", peer_id);
509                } else {
510                    // Failed to relay, requeue
511                    self.relay_queue.requeue_failed(item);
512                    self.relay_stats.requests_failed += 1;
513                }
514            }
515        }
516
517        self.relay_stats.current_queue_size = self.relay_queue.len();
518
519        if processed > 0 {
520            debug!(
521                "Processed {} queued relay requests for peer {:?}",
522                processed, peer_id
523            );
524        }
525    }
526
527    /// Process pending relay requests (should be called periodically)
528    pub fn process_relay_queue(&mut self) {
529        let now = Instant::now();
530        let mut processed = 0;
531        let mut failed = 0;
532
533        // Process ready relay requests
534        while let Some(item) = self.relay_queue.next_ready(now) {
535            if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
536                if self.relay_frame_to_connection(ch, item.frame.clone()) {
537                    self.relay_stats.requests_relayed += 1;
538                    processed += 1;
539                    trace!(
540                        "Successfully relayed frame to peer {:?}",
541                        item.target_peer_id
542                    );
543                } else {
544                    // Failed to relay, requeue for retry
545                    self.relay_queue.requeue_failed(item);
546                    self.relay_stats.requests_failed += 1;
547                    // Record connection failure error
548                    self.relay_stats_collector
549                        .record_error("connection_failure");
550                    failed += 1;
551                }
552            } else {
553                // Peer not connected, requeue for later
554                self.relay_queue.requeue_failed(item);
555                // Record peer not found error
556                self.relay_stats_collector.record_error("peer_not_found");
557                failed += 1;
558            }
559        }
560
561        // Clean up expired requests
562        let expired = self.relay_queue.cleanup_expired(now);
563        if expired > 0 {
564            self.relay_stats.requests_timed_out += expired as u64;
565            // Record timeout errors for each expired request
566            for _ in 0..expired {
567                self.relay_stats_collector.record_error("request_timeout");
568            }
569            debug!("Cleaned up {} expired relay requests", expired);
570        }
571
572        self.relay_stats.current_queue_size = self.relay_queue.len();
573
574        if processed > 0 || failed > 0 {
575            trace!(
576                "Relay queue processing: {} processed, {} failed, {} in queue",
577                processed,
578                failed,
579                self.relay_queue.len()
580            );
581        }
582    }
583
584    /// Get relay statistics for monitoring
585    pub fn relay_stats(&self) -> &RelayStats {
586        &self.relay_stats
587    }
588
589    /// Get comprehensive relay statistics for monitoring and analysis
590    pub fn comprehensive_relay_stats(&self) -> crate::relay::RelayStatistics {
591        // Update the collector with current queue stats before collecting
592        self.relay_stats_collector
593            .update_queue_stats(&self.relay_stats);
594        self.relay_stats_collector.collect_statistics()
595    }
596
597    /// Get relay statistics collector for external registration of components
598    pub fn relay_stats_collector(&self) -> &RelayStatisticsCollector {
599        &self.relay_stats_collector
600    }
601
602    /// Get relay queue length
603    pub fn relay_queue_len(&self) -> usize {
604        self.relay_queue.len()
605    }
606
607    /// Process `EndpointEvent`s emitted from related `Connection`s
608    ///
609    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
610    pub fn handle_event(
611        &mut self,
612        ch: ConnectionHandle,
613        event: EndpointEvent,
614    ) -> Option<ConnectionEvent> {
615        use EndpointEventInner::*;
616        match event.0 {
617            EndpointEventInner::NeedIdentifiers(now, n) => {
618                return Some(self.send_new_identifiers(now, ch, n));
619            }
620            ResetToken(remote, token) => {
621                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
622                    self.index.connection_reset_tokens.remove(old.0, old.1);
623                }
624                if self.index.connection_reset_tokens.insert(remote, token, ch) {
625                    warn!("duplicate reset token");
626                }
627            }
628            RetireConnectionId(now, seq, allow_more_cids) => {
629                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
630                    trace!("peer retired CID {}: {}", seq, cid);
631                    self.index.retire(cid);
632                    if allow_more_cids {
633                        return Some(self.send_new_identifiers(now, ch, 1));
634                    }
635                }
636            }
637            RelayPunchMeNow(target_peer_id, punch_me_now) => {
638                // Handle relay request from bootstrap node
639                let peer_id = PeerId(target_peer_id);
640                if self.queue_frame_for_peer(&peer_id, punch_me_now) {
641                    trace!(
642                        "Successfully queued PunchMeNow frame for relay to peer {:?}",
643                        peer_id
644                    );
645                } else {
646                    warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
647                }
648            }
649            SendAddressFrame(add_address_frame) => {
650                // Convert to a connection event so the connection queues the frame for transmit
651                return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
652                    add_address_frame,
653                )));
654            }
655            NatCandidateValidated { address, challenge } => {
656                // Handle successful NAT traversal candidate validation
657                trace!(
658                    "NAT candidate validation succeeded for {} with challenge {:016x}",
659                    address, challenge
660                );
661
662                // The validation success is primarily handled by the connection-level state machine
663                // This event serves as notification to the endpoint for potential coordination
664                // with other components or logging/metrics collection
665                debug!("NAT candidate {} validated successfully", address);
666            }
667            Drained => {
668                if let Some(conn) = self.connections.try_remove(ch.0) {
669                    self.index.remove(&conn);
670                    // Clean up peer connection mapping if this connection has a peer ID
671                    if let Some(peer_id) = conn.peer_id {
672                        self.peer_connections.remove(&peer_id);
673                        trace!("Cleaned up peer connection mapping for {:?}", peer_id);
674                    }
675                } else {
676                    // This indicates a bug in downstream code, which could cause spurious
677                    // connection loss instead of this error if the CID was (re)allocated prior to
678                    // the illegal call.
679                    error!(id = ch.0, "unknown connection drained");
680                }
681            }
682        }
683        None
684    }
685
686    /// Process an incoming UDP datagram
687    pub fn handle(
688        &mut self,
689        now: Instant,
690        remote: SocketAddr,
691        local_ip: Option<IpAddr>,
692        ecn: Option<EcnCodepoint>,
693        data: BytesMut,
694        buf: &mut Vec<u8>,
695    ) -> Option<DatagramEvent> {
696        // Partially decode packet or short-circuit if unable
697        let datagram_len = data.len();
698        let event = match PartialDecode::new(
699            data,
700            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
701            &self.config.supported_versions,
702            self.config.grease_quic_bit,
703        ) {
704            Ok((first_decode, remaining)) => DatagramConnectionEvent {
705                now,
706                remote,
707                ecn,
708                first_decode,
709                remaining,
710            },
711            Err(PacketDecodeError::UnsupportedVersion {
712                src_cid,
713                dst_cid,
714                version,
715            }) => {
716                if self.server_config.is_none() {
717                    debug!("dropping packet with unsupported version");
718                    return None;
719                }
720                trace!("sending version negotiation");
721                // Negotiate versions
722                Header::VersionNegotiate {
723                    random: self.rng.r#gen::<u8>() | 0x40,
724                    src_cid: dst_cid,
725                    dst_cid: src_cid,
726                }
727                .encode(buf);
728                // Grease with a reserved version
729                buf.write::<u32>(match version {
730                    0x0a1a_2a3a => 0x0a1a_2a4a,
731                    _ => 0x0a1a_2a3a,
732                });
733                for &version in &self.config.supported_versions {
734                    buf.write(version);
735                }
736                return Some(DatagramEvent::Response(Transmit {
737                    destination: remote,
738                    ecn: None,
739                    size: buf.len(),
740                    segment_size: None,
741                    src_ip: local_ip,
742                }));
743            }
744            Err(e) => {
745                trace!("malformed header: {}", e);
746                return None;
747            }
748        };
749
750        let addresses = FourTuple { remote, local_ip };
751        let dst_cid = event.first_decode.dst_cid();
752
753        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
754            // Handle packet on existing connection
755            match route_to {
756                RouteDatagramTo::Incoming(incoming_idx) => {
757                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
758                    let config = &self.server_config.as_ref().unwrap();
759
760                    if incoming_buffer
761                        .total_bytes
762                        .checked_add(datagram_len as u64)
763                        .is_some_and(|n| n <= config.incoming_buffer_size)
764                        && self
765                            .all_incoming_buffers_total_bytes
766                            .checked_add(datagram_len as u64)
767                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
768                    {
769                        incoming_buffer.datagrams.push(event);
770                        incoming_buffer.total_bytes += datagram_len as u64;
771                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
772                    }
773
774                    None
775                }
776                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
777                    ch,
778                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
779                )),
780            }
781        } else if event.first_decode.initial_header().is_some() {
782            // Potentially create a new connection
783
784            self.handle_first_packet(datagram_len, event, addresses, buf)
785        } else if event.first_decode.has_long_header() {
786            debug!(
787                "ignoring non-initial packet for unknown connection {}",
788                dst_cid
789            );
790            None
791        } else if !event.first_decode.is_initial()
792            && self.local_cid_generator.validate(dst_cid).is_err()
793        {
794            // If we got this far, we're receiving a seemingly valid packet for an unknown
795            // connection. Send a stateless reset if possible.
796
797            debug!("dropping packet with invalid CID");
798            None
799        } else if dst_cid.is_empty() {
800            trace!("dropping unrecognized short packet without ID");
801            None
802        } else {
803            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
804                .map(DatagramEvent::Response)
805        }
806    }
807
808    fn stateless_reset(
809        &mut self,
810        now: Instant,
811        inciting_dgram_len: usize,
812        addresses: FourTuple,
813        dst_cid: ConnectionId,
814        buf: &mut Vec<u8>,
815    ) -> Option<Transmit> {
816        if self
817            .last_stateless_reset
818            .is_some_and(|last| last + self.config.min_reset_interval > now)
819        {
820            debug!("ignoring unexpected packet within minimum stateless reset interval");
821            return None;
822        }
823
824        /// Minimum amount of padding for the stateless reset to look like a short-header packet
825        const MIN_PADDING_LEN: usize = 5;
826
827        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
828        // smaller than the inciting packet.
829        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
830            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
831            _ => {
832                debug!(
833                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
834                    inciting_dgram_len
835                );
836                return None;
837            }
838        };
839
840        debug!(
841            "sending stateless reset for {} to {}",
842            dst_cid, addresses.remote
843        );
844        self.last_stateless_reset = Some(now);
845        // Resets with at least this much padding can't possibly be distinguished from real packets
846        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
847        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
848            max_padding_len
849        } else {
850            self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
851        };
852        buf.reserve(padding_len + RESET_TOKEN_SIZE);
853        buf.resize(padding_len, 0);
854        self.rng.fill_bytes(&mut buf[0..padding_len]);
855        buf[0] = 0b0100_0000 | (buf[0] >> 2);
856        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
857
858        debug_assert!(buf.len() < inciting_dgram_len);
859
860        Some(Transmit {
861            destination: addresses.remote,
862            ecn: None,
863            size: buf.len(),
864            segment_size: None,
865            src_ip: addresses.local_ip,
866        })
867    }
868
869    /// Initiate a connection
870    pub fn connect(
871        &mut self,
872        now: Instant,
873        config: ClientConfig,
874        remote: SocketAddr,
875        server_name: &str,
876    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
877        if self.cids_exhausted() {
878            return Err(ConnectError::CidsExhausted);
879        }
880        if remote.port() == 0 || remote.ip().is_unspecified() {
881            return Err(ConnectError::InvalidRemoteAddress(remote));
882        }
883        if !self.config.supported_versions.contains(&config.version) {
884            return Err(ConnectError::UnsupportedVersion);
885        }
886
887        let remote_id = (config.initial_dst_cid_provider)();
888        trace!(initial_dcid = %remote_id);
889
890        let ch = ConnectionHandle(self.connections.vacant_key());
891        let loc_cid = self.new_cid(ch);
892        let params = TransportParameters::new(
893            &config.transport,
894            &self.config,
895            self.local_cid_generator.as_ref(),
896            loc_cid,
897            None,
898            &mut self.rng,
899        );
900        let tls = config
901            .crypto
902            .start_session(config.version, server_name, &params)?;
903
904        let conn = self.add_connection(
905            ch,
906            config.version,
907            remote_id,
908            loc_cid,
909            remote_id,
910            FourTuple {
911                remote,
912                local_ip: None,
913            },
914            now,
915            tls,
916            config.transport,
917            SideArgs::Client {
918                token_store: config.token_store,
919                server_name: server_name.into(),
920            },
921        );
922        Ok((ch, conn))
923    }
924
925    fn send_new_identifiers(
926        &mut self,
927        now: Instant,
928        ch: ConnectionHandle,
929        num: u64,
930    ) -> ConnectionEvent {
931        let mut ids = vec![];
932        for _ in 0..num {
933            let id = self.new_cid(ch);
934            let meta = &mut self.connections[ch];
935            let sequence = meta.cids_issued;
936            meta.cids_issued += 1;
937            meta.loc_cids.insert(sequence, id);
938            ids.push(IssuedCid {
939                sequence,
940                id,
941                reset_token: ResetToken::new(&*self.config.reset_key, id),
942            });
943        }
944        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
945    }
946
947    /// Generate a connection ID for `ch`
948    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
949        loop {
950            let cid = self.local_cid_generator.generate_cid();
951            if cid.is_empty() {
952                // Zero-length CID; nothing to track
953                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
954                return cid;
955            }
956            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
957                e.insert(ch);
958                break cid;
959            }
960        }
961    }
962
963    fn handle_first_packet(
964        &mut self,
965        datagram_len: usize,
966        event: DatagramConnectionEvent,
967        addresses: FourTuple,
968        buf: &mut Vec<u8>,
969    ) -> Option<DatagramEvent> {
970        let dst_cid = event.first_decode.dst_cid();
971        let header = event.first_decode.initial_header().unwrap();
972
973        let Some(server_config) = &self.server_config else {
974            debug!("packet for unrecognized connection {}", dst_cid);
975            return self
976                .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
977                .map(DatagramEvent::Response);
978        };
979
980        if datagram_len < MIN_INITIAL_SIZE as usize {
981            debug!("ignoring short initial for connection {}", dst_cid);
982            return None;
983        }
984
985        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
986            Ok(keys) => keys,
987            Err(UnsupportedVersion) => {
988                // This probably indicates that the user set supported_versions incorrectly in
989                // `EndpointConfig`.
990                debug!(
991                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
992                    header.version
993                );
994                return None;
995            }
996        };
997
998        if let Err(reason) = self.early_validate_first_packet(header) {
999            return Some(DatagramEvent::Response(self.initial_close(
1000                header.version,
1001                addresses,
1002                &crypto,
1003                &header.src_cid,
1004                reason,
1005                buf,
1006            )));
1007        }
1008
1009        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
1010            Ok(packet) => packet,
1011            Err(e) => {
1012                trace!("unable to decode initial packet: {}", e);
1013                return None;
1014            }
1015        };
1016
1017        if !packet.reserved_bits_valid() {
1018            debug!("dropping connection attempt with invalid reserved bits");
1019            return None;
1020        }
1021
1022        let Header::Initial(header) = packet.header else {
1023            panic!("non-initial packet in handle_first_packet()");
1024        };
1025
1026        let server_config = self.server_config.as_ref().unwrap().clone();
1027
1028        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
1029            Ok(token) => token,
1030            Err(InvalidRetryTokenError) => {
1031                debug!("rejecting invalid retry token");
1032                return Some(DatagramEvent::Response(self.initial_close(
1033                    header.version,
1034                    addresses,
1035                    &crypto,
1036                    &header.src_cid,
1037                    TransportError::INVALID_TOKEN(""),
1038                    buf,
1039                )));
1040            }
1041        };
1042
1043        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1044        self.index
1045            .insert_initial_incoming(header.dst_cid, incoming_idx);
1046
1047        Some(DatagramEvent::NewConnection(Incoming {
1048            received_at: event.now,
1049            addresses,
1050            ecn: event.ecn,
1051            packet: InitialPacket {
1052                header,
1053                header_data: packet.header_data,
1054                payload: packet.payload,
1055            },
1056            rest: event.remaining,
1057            crypto,
1058            token,
1059            incoming_idx,
1060            improper_drop_warner: IncomingImproperDropWarner,
1061        }))
1062    }
1063
1064    /// Attempt to accept this incoming connection (an error may still occur)
1065    // AcceptError cannot be made smaller without semver breakage
1066    #[allow(clippy::result_large_err)]
1067    pub fn accept(
1068        &mut self,
1069        mut incoming: Incoming,
1070        now: Instant,
1071        buf: &mut Vec<u8>,
1072        server_config: Option<Arc<ServerConfig>>,
1073    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1074        let remote_address_validated = incoming.remote_address_validated();
1075        incoming.improper_drop_warner.dismiss();
1076        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1077        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1078
1079        let packet_number = incoming.packet.header.number.expand(0);
1080        let InitialHeader {
1081            src_cid,
1082            dst_cid,
1083            version,
1084            ..
1085        } = incoming.packet.header;
1086        let server_config =
1087            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1088
1089        if server_config
1090            .transport
1091            .max_idle_timeout
1092            .is_some_and(|timeout| {
1093                incoming.received_at + Duration::from_millis(timeout.into()) <= now
1094            })
1095        {
1096            debug!("abandoning accept of stale initial");
1097            self.index.remove_initial(dst_cid);
1098            return Err(AcceptError {
1099                cause: ConnectionError::TimedOut,
1100                response: None,
1101            });
1102        }
1103
1104        if self.cids_exhausted() {
1105            debug!("refusing connection");
1106            self.index.remove_initial(dst_cid);
1107            return Err(AcceptError {
1108                cause: ConnectionError::CidsExhausted,
1109                response: Some(self.initial_close(
1110                    version,
1111                    incoming.addresses,
1112                    &incoming.crypto,
1113                    &src_cid,
1114                    TransportError::CONNECTION_REFUSED(""),
1115                    buf,
1116                )),
1117            });
1118        }
1119
1120        if incoming
1121            .crypto
1122            .packet
1123            .remote
1124            .decrypt(
1125                packet_number,
1126                &incoming.packet.header_data,
1127                &mut incoming.packet.payload,
1128            )
1129            .is_err()
1130        {
1131            debug!(packet_number, "failed to authenticate initial packet");
1132            self.index.remove_initial(dst_cid);
1133            return Err(AcceptError {
1134                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1135                response: None,
1136            });
1137        };
1138
1139        let ch = ConnectionHandle(self.connections.vacant_key());
1140        let loc_cid = self.new_cid(ch);
1141        let mut params = TransportParameters::new(
1142            &server_config.transport,
1143            &self.config,
1144            self.local_cid_generator.as_ref(),
1145            loc_cid,
1146            Some(&server_config),
1147            &mut self.rng,
1148        );
1149        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1150        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1151        params.retry_src_cid = incoming.token.retry_src_cid;
1152        let mut pref_addr_cid = None;
1153        if server_config.has_preferred_address() {
1154            let cid = self.new_cid(ch);
1155            pref_addr_cid = Some(cid);
1156            params.preferred_address = Some(PreferredAddress {
1157                address_v4: server_config.preferred_address_v4,
1158                address_v6: server_config.preferred_address_v6,
1159                connection_id: cid,
1160                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1161            });
1162        }
1163
1164        let tls = server_config.crypto.clone().start_session(version, &params);
1165        let transport_config = server_config.transport.clone();
1166        let mut conn = self.add_connection(
1167            ch,
1168            version,
1169            dst_cid,
1170            loc_cid,
1171            src_cid,
1172            incoming.addresses,
1173            incoming.received_at,
1174            tls,
1175            transport_config,
1176            SideArgs::Server {
1177                server_config,
1178                pref_addr_cid,
1179                path_validated: remote_address_validated,
1180            },
1181        );
1182        self.index.insert_initial(dst_cid, ch);
1183
1184        match conn.handle_first_packet(
1185            incoming.received_at,
1186            incoming.addresses.remote,
1187            incoming.ecn,
1188            packet_number,
1189            incoming.packet,
1190            incoming.rest,
1191        ) {
1192            Ok(()) => {
1193                trace!(id = ch.0, icid = %dst_cid, "new connection");
1194
1195                for event in incoming_buffer.datagrams {
1196                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1197                }
1198
1199                Ok((ch, conn))
1200            }
1201            Err(e) => {
1202                debug!("handshake failed: {}", e);
1203                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1204                let response = match e {
1205                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
1206                        version,
1207                        incoming.addresses,
1208                        &incoming.crypto,
1209                        &src_cid,
1210                        e.clone(),
1211                        buf,
1212                    )),
1213                    _ => None,
1214                };
1215                Err(AcceptError { cause: e, response })
1216            }
1217        }
1218    }
1219
1220    /// Check if we should refuse a connection attempt regardless of the packet's contents
1221    fn early_validate_first_packet(
1222        &mut self,
1223        header: &ProtectedInitialHeader,
1224    ) -> Result<(), TransportError> {
1225        let config = &self.server_config.as_ref().unwrap();
1226        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1227            return Err(TransportError::CONNECTION_REFUSED(""));
1228        }
1229
1230        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
1231        // bytes. If this is a Retry packet, then the length must instead match our usual CID
1232        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
1233        // also need to validate CID length for those after decoding the token.
1234        if header.dst_cid.len() < 8
1235            && (header.token_pos.is_empty()
1236                || header.dst_cid.len() != self.local_cid_generator.cid_len())
1237        {
1238            debug!(
1239                "rejecting connection due to invalid DCID length {}",
1240                header.dst_cid.len()
1241            );
1242            return Err(TransportError::PROTOCOL_VIOLATION(
1243                "invalid destination CID length",
1244            ));
1245        }
1246
1247        Ok(())
1248    }
1249
1250    /// Reject this incoming connection attempt
1251    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1252        self.clean_up_incoming(&incoming);
1253        incoming.improper_drop_warner.dismiss();
1254
1255        self.initial_close(
1256            incoming.packet.header.version,
1257            incoming.addresses,
1258            &incoming.crypto,
1259            &incoming.packet.header.src_cid,
1260            TransportError::CONNECTION_REFUSED(""),
1261            buf,
1262        )
1263    }
1264
1265    /// Respond with a retry packet, requiring the client to retry with address validation
1266    ///
1267    /// Errors if `incoming.may_retry()` is false.
1268    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1269        if !incoming.may_retry() {
1270            return Err(RetryError(Box::new(incoming)));
1271        }
1272
1273        self.clean_up_incoming(&incoming);
1274        incoming.improper_drop_warner.dismiss();
1275
1276        let server_config = self.server_config.as_ref().unwrap();
1277
1278        // First Initial
1279        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
1280        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
1281        // with established connections. In the unlikely event that a collision occurs
1282        // between two connections in the initial phase, both will fail fast and may be
1283        // retried by the application layer.
1284        let loc_cid = self.local_cid_generator.generate_cid();
1285
1286        let payload = TokenPayload::Retry {
1287            address: incoming.addresses.remote,
1288            orig_dst_cid: incoming.packet.header.dst_cid,
1289            issued: server_config.time_source.now(),
1290        };
1291        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1292
1293        let header = Header::Retry {
1294            src_cid: loc_cid,
1295            dst_cid: incoming.packet.header.src_cid,
1296            version: incoming.packet.header.version,
1297        };
1298
1299        let encode = header.encode(buf);
1300        buf.put_slice(&token);
1301        buf.extend_from_slice(&server_config.crypto.retry_tag(
1302            incoming.packet.header.version,
1303            &incoming.packet.header.dst_cid,
1304            buf,
1305        ));
1306        encode.finish(buf, &*incoming.crypto.header.local, None);
1307
1308        Ok(Transmit {
1309            destination: incoming.addresses.remote,
1310            ecn: None,
1311            size: buf.len(),
1312            segment_size: None,
1313            src_ip: incoming.addresses.local_ip,
1314        })
1315    }
1316
1317    /// Ignore this incoming connection attempt, not sending any packet in response
1318    ///
1319    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
1320    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
1321    pub fn ignore(&mut self, incoming: Incoming) {
1322        self.clean_up_incoming(&incoming);
1323        incoming.improper_drop_warner.dismiss();
1324    }
1325
1326    /// Clean up endpoint data structures associated with an `Incoming`.
1327    fn clean_up_incoming(&mut self, incoming: &Incoming) {
1328        self.index.remove_initial(incoming.packet.header.dst_cid);
1329        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1330        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1331    }
1332
1333    fn add_connection(
1334        &mut self,
1335        ch: ConnectionHandle,
1336        version: u32,
1337        init_cid: ConnectionId,
1338        loc_cid: ConnectionId,
1339        rem_cid: ConnectionId,
1340        addresses: FourTuple,
1341        now: Instant,
1342        tls: Box<dyn crypto::Session>,
1343        transport_config: Arc<TransportConfig>,
1344        side_args: SideArgs,
1345    ) -> Connection {
1346        let mut rng_seed = [0; 32];
1347        self.rng.fill_bytes(&mut rng_seed);
1348        let side = side_args.side();
1349        let pref_addr_cid = side_args.pref_addr_cid();
1350        let conn = Connection::new(
1351            self.config.clone(),
1352            transport_config,
1353            init_cid,
1354            loc_cid,
1355            rem_cid,
1356            addresses.remote,
1357            addresses.local_ip,
1358            tls,
1359            self.local_cid_generator.as_ref(),
1360            now,
1361            version,
1362            self.allow_mtud,
1363            rng_seed,
1364            side_args,
1365        );
1366
1367        let mut cids_issued = 0;
1368        let mut loc_cids = FxHashMap::default();
1369
1370        loc_cids.insert(cids_issued, loc_cid);
1371        cids_issued += 1;
1372
1373        if let Some(cid) = pref_addr_cid {
1374            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1375            loc_cids.insert(cids_issued, cid);
1376            cids_issued += 1;
1377        }
1378
1379        let id = self.connections.insert(ConnectionMeta {
1380            init_cid,
1381            cids_issued,
1382            loc_cids,
1383            addresses,
1384            side,
1385            reset_token: None,
1386            peer_id: None,
1387        });
1388        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1389
1390        self.index.insert_conn(addresses, loc_cid, ch, side);
1391
1392        conn
1393    }
1394
1395    fn initial_close(
1396        &mut self,
1397        version: u32,
1398        addresses: FourTuple,
1399        crypto: &Keys,
1400        remote_id: &ConnectionId,
1401        reason: TransportError,
1402        buf: &mut Vec<u8>,
1403    ) -> Transmit {
1404        // We don't need to worry about CID collisions in initial closes because the peer
1405        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
1406        // unexpected response.
1407        let local_id = self.local_cid_generator.generate_cid();
1408        let number = PacketNumber::U8(0);
1409        let header = Header::Initial(InitialHeader {
1410            dst_cid: *remote_id,
1411            src_cid: local_id,
1412            number,
1413            token: Bytes::new(),
1414            version,
1415        });
1416
1417        let partial_encode = header.encode(buf);
1418        let max_len =
1419            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1420        frame::Close::from(reason).encode(buf, max_len);
1421        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1422        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1423        Transmit {
1424            destination: addresses.remote,
1425            ecn: None,
1426            size: buf.len(),
1427            segment_size: None,
1428            src_ip: addresses.local_ip,
1429        }
1430    }
1431
1432    /// Access the configuration used by this endpoint
1433    pub fn config(&self) -> &EndpointConfig {
1434        &self.config
1435    }
1436
1437    /// Enable or disable address discovery for this endpoint
1438    ///
1439    /// Address discovery is enabled by default. When enabled, the endpoint will:
1440    /// - Send OBSERVED_ADDRESS frames to peers to inform them of their reflexive addresses
1441    /// - Process received OBSERVED_ADDRESS frames to learn about its own reflexive addresses
1442    /// - Integrate discovered addresses with NAT traversal for improved connectivity
1443    pub fn enable_address_discovery(&mut self, enabled: bool) {
1444        self.address_discovery_enabled = enabled;
1445        // Note: Existing connections will continue with their current setting.
1446        // New connections will use the updated setting.
1447    }
1448
1449    /// Check if address discovery is enabled
1450    pub fn address_discovery_enabled(&self) -> bool {
1451        self.address_discovery_enabled
1452    }
1453
1454    /// Get all discovered addresses across all connections
1455    ///
1456    /// Returns a list of unique socket addresses that have been observed
1457    /// by remote peers and reported via OBSERVED_ADDRESS frames.
1458    ///
1459    /// Note: This returns an empty vector in the current implementation.
1460    /// Applications should track discovered addresses at the connection level.
1461    pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1462        // TODO: Implement address tracking at the endpoint level
1463        Vec::new()
1464    }
1465
1466    /// Set a callback to be invoked when an address change is detected
1467    ///
1468    /// The callback receives the old address (if any) and the new address.
1469    /// Only one callback can be set at a time; setting a new callback replaces the previous one.
1470    pub fn set_address_change_callback<F>(&mut self, callback: F)
1471    where
1472        F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1473    {
1474        self.address_change_callback = Some(Box::new(callback));
1475    }
1476
1477    /// Clear the address change callback
1478    pub fn clear_address_change_callback(&mut self) {
1479        self.address_change_callback = None;
1480    }
1481
1482    /// Get address discovery statistics
1483    ///
1484    /// Note: This returns default statistics in the current implementation.
1485    /// Applications should track statistics at the connection level.
1486    pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1487        // TODO: Implement statistics tracking at the endpoint level
1488        AddressDiscoveryStats::default()
1489    }
1490
1491    /// Number of connections that are currently open
1492    pub fn open_connections(&self) -> usize {
1493        self.connections.len()
1494    }
1495
1496    /// Counter for the number of bytes currently used
1497    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
1498    pub fn incoming_buffer_bytes(&self) -> u64 {
1499        self.all_incoming_buffers_total_bytes
1500    }
1501
1502    #[cfg(test)]
1503    pub(crate) fn known_connections(&self) -> usize {
1504        let x = self.connections.len();
1505        debug_assert_eq!(x, self.index.connection_ids_initial.len());
1506        // Not all connections have known reset tokens
1507        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1508        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
1509        debug_assert!(x >= self.index.incoming_connection_remotes.len());
1510        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1511        x
1512    }
1513
1514    #[cfg(test)]
1515    pub(crate) fn known_cids(&self) -> usize {
1516        self.index.connection_ids.len()
1517    }
1518
1519    /// Whether we've used up 3/4 of the available CID space
1520    ///
1521    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
1522    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
1523    fn cids_exhausted(&self) -> bool {
1524        self.local_cid_generator.cid_len() <= 4
1525            && self.local_cid_generator.cid_len() != 0
1526            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1527                - self.index.connection_ids.len())
1528                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1529    }
1530}
1531
1532impl fmt::Debug for Endpoint {
1533    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1534        fmt.debug_struct("Endpoint")
1535            .field("rng", &self.rng)
1536            .field("index", &self.index)
1537            .field("connections", &self.connections)
1538            .field("config", &self.config)
1539            .field("server_config", &self.server_config)
1540            // incoming_buffers too large
1541            .field("incoming_buffers.len", &self.incoming_buffers.len())
1542            .field(
1543                "all_incoming_buffers_total_bytes",
1544                &self.all_incoming_buffers_total_bytes,
1545            )
1546            .finish()
1547    }
1548}
1549
1550/// Buffered Initial and 0-RTT messages for a pending incoming connection
1551#[derive(Default)]
1552struct IncomingBuffer {
1553    datagrams: Vec<DatagramConnectionEvent>,
1554    total_bytes: u64,
1555}
1556
1557/// Part of protocol state incoming datagrams can be routed to
1558#[derive(Copy, Clone, Debug)]
1559enum RouteDatagramTo {
1560    Incoming(usize),
1561    Connection(ConnectionHandle),
1562}
1563
1564/// Maps packets to existing connections
1565#[derive(Default, Debug)]
1566struct ConnectionIndex {
1567    /// Identifies connections based on the initial DCID the peer utilized
1568    ///
1569    /// Uses a standard `HashMap` to protect against hash collision attacks.
1570    ///
1571    /// Used by the server, not the client.
1572    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1573    /// Identifies connections based on locally created CIDs
1574    ///
1575    /// Uses a cheaper hash function since keys are locally created
1576    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1577    /// Identifies incoming connections with zero-length CIDs
1578    ///
1579    /// Uses a standard `HashMap` to protect against hash collision attacks.
1580    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1581    /// Identifies outgoing connections with zero-length CIDs
1582    ///
1583    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1584    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1585    /// may be established per remote. We must omit the local address from the key because we don't
1586    /// necessarily know what address we're sending from, and hence receiving at.
1587    ///
1588    /// Uses a standard `HashMap` to protect against hash collision attacks.
1589    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1590    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1591    ///
1592    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1593    /// recipient, if any.
1594    connection_reset_tokens: ResetTokenTable,
1595}
1596
1597impl ConnectionIndex {
1598    /// Associate an incoming connection with its initial destination CID
1599    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1600        if dst_cid.is_empty() {
1601            return;
1602        }
1603        self.connection_ids_initial
1604            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1605    }
1606
1607    /// Remove an association with an initial destination CID
1608    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1609        if dst_cid.is_empty() {
1610            return;
1611        }
1612        let removed = self.connection_ids_initial.remove(&dst_cid);
1613        debug_assert!(removed.is_some());
1614    }
1615
1616    /// Associate a connection with its initial destination CID
1617    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1618        if dst_cid.is_empty() {
1619            return;
1620        }
1621        self.connection_ids_initial
1622            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1623    }
1624
1625    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1626    /// its current 4-tuple
1627    fn insert_conn(
1628        &mut self,
1629        addresses: FourTuple,
1630        dst_cid: ConnectionId,
1631        connection: ConnectionHandle,
1632        side: Side,
1633    ) {
1634        match dst_cid.len() {
1635            0 => match side {
1636                Side::Server => {
1637                    self.incoming_connection_remotes
1638                        .insert(addresses, connection);
1639                }
1640                Side::Client => {
1641                    self.outgoing_connection_remotes
1642                        .insert(addresses.remote, connection);
1643                }
1644            },
1645            _ => {
1646                self.connection_ids.insert(dst_cid, connection);
1647            }
1648        }
1649    }
1650
1651    /// Discard a connection ID
1652    fn retire(&mut self, dst_cid: ConnectionId) {
1653        self.connection_ids.remove(&dst_cid);
1654    }
1655
1656    /// Remove all references to a connection
1657    fn remove(&mut self, conn: &ConnectionMeta) {
1658        if conn.side.is_server() {
1659            self.remove_initial(conn.init_cid);
1660        }
1661        for cid in conn.loc_cids.values() {
1662            self.connection_ids.remove(cid);
1663        }
1664        self.incoming_connection_remotes.remove(&conn.addresses);
1665        self.outgoing_connection_remotes
1666            .remove(&conn.addresses.remote);
1667        if let Some((remote, token)) = conn.reset_token {
1668            self.connection_reset_tokens.remove(remote, token);
1669        }
1670    }
1671
1672    /// Find the existing connection that `datagram` should be routed to, if any
1673    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1674        let dst_cid = datagram.dst_cid();
1675        let is_empty_cid = dst_cid.is_empty();
1676
1677        // Fast path: Try most common lookup first (non-empty CID)
1678        if !is_empty_cid {
1679            if let Some(&ch) = self.connection_ids.get(dst_cid) {
1680                return Some(RouteDatagramTo::Connection(ch));
1681            }
1682        }
1683
1684        // Initial/0RTT packet lookup
1685        if datagram.is_initial() || datagram.is_0rtt() {
1686            if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1687                return Some(ch);
1688            }
1689        }
1690
1691        // Empty CID lookup (less common, do after fast path)
1692        if is_empty_cid {
1693            // Check incoming connections first (servers handle more incoming)
1694            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1695                return Some(RouteDatagramTo::Connection(ch));
1696            }
1697            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1698                return Some(RouteDatagramTo::Connection(ch));
1699            }
1700        }
1701
1702        // Stateless reset token lookup (least common, do last)
1703        let data = datagram.data();
1704        if data.len() < RESET_TOKEN_SIZE {
1705            return None;
1706        }
1707        self.connection_reset_tokens
1708            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1709            .cloned()
1710            .map(RouteDatagramTo::Connection)
1711    }
1712}
1713
1714#[derive(Debug)]
1715pub(crate) struct ConnectionMeta {
1716    init_cid: ConnectionId,
1717    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1718    cids_issued: u64,
1719    loc_cids: FxHashMap<u64, ConnectionId>,
1720    /// Remote/local addresses the connection began with
1721    ///
1722    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1723    /// bother keeping it up to date.
1724    addresses: FourTuple,
1725    side: Side,
1726    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1727    /// being sent to
1728    reset_token: Option<(SocketAddr, ResetToken)>,
1729    /// Peer ID for this connection, used for relay functionality
1730    peer_id: Option<PeerId>,
1731}
1732
1733/// Internal identifier for a `Connection` currently associated with an endpoint
1734#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1735pub struct ConnectionHandle(pub usize);
1736
1737impl From<ConnectionHandle> for usize {
1738    fn from(x: ConnectionHandle) -> Self {
1739        x.0
1740    }
1741}
1742
1743impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1744    type Output = ConnectionMeta;
1745    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1746        &self[ch.0]
1747    }
1748}
1749
1750impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1751    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1752        &mut self[ch.0]
1753    }
1754}
1755
1756/// Event resulting from processing a single datagram
1757pub enum DatagramEvent {
1758    /// The datagram is redirected to its `Connection`
1759    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1760    /// The datagram may result in starting a new `Connection`
1761    NewConnection(Incoming),
1762    /// Response generated directly by the endpoint
1763    Response(Transmit),
1764}
1765
1766/// An incoming connection for which the server has not yet begun its part of the handshake.
1767pub struct Incoming {
1768    received_at: Instant,
1769    addresses: FourTuple,
1770    ecn: Option<EcnCodepoint>,
1771    packet: InitialPacket,
1772    rest: Option<BytesMut>,
1773    crypto: Keys,
1774    token: IncomingToken,
1775    incoming_idx: usize,
1776    improper_drop_warner: IncomingImproperDropWarner,
1777}
1778
1779impl Incoming {
1780    /// The local IP address which was used when the peer established the connection
1781    ///
1782    /// This has the same behavior as [`Connection::local_ip`].
1783    pub fn local_ip(&self) -> Option<IpAddr> {
1784        self.addresses.local_ip
1785    }
1786
1787    /// The peer's UDP address
1788    pub fn remote_address(&self) -> SocketAddr {
1789        self.addresses.remote
1790    }
1791
1792    /// Whether the socket address that is initiating this connection has been validated
1793    ///
1794    /// This means that the sender of the initial packet has proved that they can receive traffic
1795    /// sent to `self.remote_address()`.
1796    ///
1797    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1798    /// The inverse is not guaranteed.
1799    pub fn remote_address_validated(&self) -> bool {
1800        self.token.validated
1801    }
1802
1803    /// Whether it is legal to respond with a retry packet
1804    ///
1805    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1806    /// The inverse is not guaranteed.
1807    pub fn may_retry(&self) -> bool {
1808        self.token.retry_src_cid.is_none()
1809    }
1810
1811    /// The original destination connection ID sent by the client
1812    pub fn orig_dst_cid(&self) -> &ConnectionId {
1813        &self.token.orig_dst_cid
1814    }
1815}
1816
1817impl fmt::Debug for Incoming {
1818    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1819        f.debug_struct("Incoming")
1820            .field("addresses", &self.addresses)
1821            .field("ecn", &self.ecn)
1822            // packet doesn't implement debug
1823            // rest is too big and not meaningful enough
1824            .field("token", &self.token)
1825            .field("incoming_idx", &self.incoming_idx)
1826            // improper drop warner contains no information
1827            .finish_non_exhaustive()
1828    }
1829}
1830
1831struct IncomingImproperDropWarner;
1832
1833impl IncomingImproperDropWarner {
1834    fn dismiss(self) {
1835        mem::forget(self);
1836    }
1837}
1838
1839impl Drop for IncomingImproperDropWarner {
1840    fn drop(&mut self) {
1841        warn!(
1842            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1843               (may cause memory leak and eventual inability to accept new connections)"
1844        );
1845    }
1846}
1847
1848/// Errors in the parameters being used to create a new connection
1849///
1850/// These arise before any I/O has been performed.
1851#[derive(Debug, Error, Clone, PartialEq, Eq)]
1852pub enum ConnectError {
1853    /// The endpoint can no longer create new connections
1854    ///
1855    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1856    #[error("endpoint stopping")]
1857    EndpointStopping,
1858    /// The connection could not be created because not enough of the CID space is available
1859    ///
1860    /// Try using longer connection IDs
1861    #[error("CIDs exhausted")]
1862    CidsExhausted,
1863    /// The given server name was malformed
1864    #[error("invalid server name: {0}")]
1865    InvalidServerName(String),
1866    /// The remote [`SocketAddr`] supplied was malformed
1867    ///
1868    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1869    #[error("invalid remote address: {0}")]
1870    InvalidRemoteAddress(SocketAddr),
1871    /// No default client configuration was set up
1872    ///
1873    /// Use `Endpoint::connect_with` to specify a client configuration.
1874    #[error("no default client config")]
1875    NoDefaultClientConfig,
1876    /// The local endpoint does not support the QUIC version specified in the client configuration
1877    #[error("unsupported QUIC version")]
1878    UnsupportedVersion,
1879}
1880
1881/// Error type for attempting to accept an [`Incoming`]
1882#[derive(Debug)]
1883pub struct AcceptError {
1884    /// Underlying error describing reason for failure
1885    pub cause: ConnectionError,
1886    /// Optional response to transmit back
1887    pub response: Option<Transmit>,
1888}
1889
1890/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1891#[derive(Debug, Error)]
1892#[error("retry() with validated Incoming")]
1893pub struct RetryError(Box<Incoming>);
1894
1895impl RetryError {
1896    /// Get the [`Incoming`]
1897    pub fn into_incoming(self) -> Incoming {
1898        *self.0
1899    }
1900}
1901
1902/// Reset Tokens which are associated with peer socket addresses
1903///
1904/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1905/// peer generated and might be usable for hash collision attacks.
1906#[derive(Default, Debug)]
1907struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1908
1909impl ResetTokenTable {
1910    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1911        self.0
1912            .entry(remote)
1913            .or_default()
1914            .insert(token, ch)
1915            .is_some()
1916    }
1917
1918    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1919        use std::collections::hash_map::Entry;
1920        match self.0.entry(remote) {
1921            Entry::Vacant(_) => {}
1922            Entry::Occupied(mut e) => {
1923                e.get_mut().remove(&token);
1924                if e.get().is_empty() {
1925                    e.remove_entry();
1926                }
1927            }
1928        }
1929    }
1930
1931    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1932        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1933        self.0.get(&remote)?.get(&token)
1934    }
1935}
1936
1937/// Identifies a connection by the combination of remote and local addresses
1938///
1939/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1940/// subnet and zero-length connection IDs are in use.
1941#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1942struct FourTuple {
1943    remote: SocketAddr,
1944    // A single socket can only listen on a single port, so no need to store it explicitly
1945    local_ip: Option<IpAddr>,
1946}