ant_quic/
endpoint.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    collections::{HashMap, VecDeque, hash_map},
10    convert::TryFrom,
11    fmt, mem,
12    net::{IpAddr, SocketAddr},
13    ops::{Index, IndexMut},
14    sync::Arc,
15};
16
17use indexmap::IndexMap;
18
19use bytes::{BufMut, Bytes, BytesMut};
20use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
21use rustc_hash::FxHashMap;
22use rustls;
23use slab::Slab;
24use thiserror::Error;
25use tracing::{debug, error, trace, warn};
26
27use crate::{
28    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
29    Side, Transmit, TransportConfig, TransportError,
30    cid_generator::ConnectionIdGenerator,
31    coding::BufMutExt,
32    config::{ClientConfig, EndpointConfig, ServerConfig},
33    connection::{Connection, ConnectionError, SideArgs},
34    crypto::{self, Keys, UnsupportedVersion},
35    frame,
36    nat_traversal_api::PeerId,
37    packet::{
38        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
39        PacketNumber, PartialDecode, ProtectedInitialHeader,
40    },
41    relay::RelayStatisticsCollector,
42    shared::{
43        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44        EndpointEvent, EndpointEventInner, IssuedCid,
45    },
46    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
47    transport_parameters::{PreferredAddress, TransportParameters},
48};
49
50/// A queued relay request for bootstrap nodes
51#[derive(Debug, Clone)]
52struct RelayQueueItem {
53    /// Target peer ID for the relay
54    target_peer_id: PeerId,
55    /// Frame to be relayed
56    frame: frame::PunchMeNow,
57    /// When this relay request was created
58    created_at: Instant,
59    /// Number of relay attempts made
60    attempts: u32,
61    /// Last attempt time
62    last_attempt: Option<Instant>,
63}
64
65/// Relay queue management for bootstrap nodes
66#[derive(Debug)]
67struct RelayQueue {
68    /// Pending relay requests with insertion order and O(1) access
69    pending: IndexMap<u64, RelayQueueItem>,
70    /// Next sequence number for insertion order
71    next_seq: u64,
72    /// Maximum queue size to prevent memory exhaustion
73    max_queue_size: usize,
74    /// Timeout for relay requests
75    request_timeout: Duration,
76    /// Maximum retry attempts per request
77    max_retries: u32,
78    /// Minimum interval between retry attempts
79    retry_interval: Duration,
80    /// Rate limiting: track recent relay requests per peer
81    rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
82    /// Maximum relays per peer per time window
83    max_relays_per_peer: usize,
84    /// Rate limiting time window
85    rate_limit_window: Duration,
86}
87
88/// Address discovery statistics
89#[derive(Debug, Default, Clone)]
90pub struct AddressDiscoveryStats {
91    /// Number of OBSERVED_ADDRESS frames sent
92    pub frames_sent: u64,
93    /// Number of OBSERVED_ADDRESS frames received
94    pub frames_received: u64,
95    /// Number of unique addresses discovered
96    pub addresses_discovered: u64,
97    /// Number of address changes detected
98    pub address_changes_detected: u64,
99}
100
101/// Relay statistics for monitoring and debugging
102#[derive(Debug, Default, Clone)]
103pub struct RelayStats {
104    /// Total relay requests received
105    pub requests_received: u64,
106    /// Successfully relayed requests
107    pub requests_relayed: u64,
108    /// Failed relay requests (peer not found)
109    pub requests_failed: u64,
110    /// Requests dropped due to queue full
111    pub requests_dropped: u64,
112    /// Requests timed out
113    pub requests_timed_out: u64,
114    /// Requests dropped due to rate limiting
115    pub requests_rate_limited: u64,
116    /// Current queue size
117    pub current_queue_size: usize,
118}
119
120impl RelayQueue {
121    /// Create a new relay queue with default settings
122    fn new() -> Self {
123        Self {
124            pending: IndexMap::new(),
125            next_seq: 0,
126            max_queue_size: 1000,                     // Reasonable default
127            request_timeout: Duration::from_secs(30), // 30 second timeout
128            max_retries: 3,
129            retry_interval: Duration::from_millis(500), // 500ms between retries
130            rate_limiter: HashMap::new(),
131            max_relays_per_peer: 10, // Max 10 relays per peer per time window
132            rate_limit_window: Duration::from_secs(60), // 1 minute window
133        }
134    }
135
136    /// Add a relay request to the queue
137    fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
138        // Check queue size limit
139        if self.pending.len() >= self.max_queue_size {
140            warn!(
141                "Relay queue full, dropping request for peer {:?}",
142                target_peer_id
143            );
144            return false;
145        }
146
147        // Check rate limit for this peer
148        if !self.check_rate_limit(target_peer_id, now) {
149            warn!(
150                "Rate limit exceeded for peer {:?}, dropping relay request",
151                target_peer_id
152            );
153            return false;
154        }
155
156        let item = RelayQueueItem {
157            target_peer_id,
158            frame,
159            created_at: now,
160            attempts: 0,
161            last_attempt: None,
162        };
163
164        let seq = self.next_seq;
165        self.next_seq += 1;
166        self.pending.insert(seq, item);
167
168        // Record this request for rate limiting
169        self.record_relay_request(target_peer_id, now);
170
171        trace!(
172            "Queued relay request for peer {:?}, queue size: {}",
173            target_peer_id,
174            self.pending.len()
175        );
176        true
177    }
178
179    /// Check if a relay request is within rate limits
180    fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
181        // Clean up old entries first
182        self.cleanup_rate_limiter(now);
183
184        // Check current request count for this peer
185        if let Some(requests) = self.rate_limiter.get(&peer_id) {
186            requests.len() < self.max_relays_per_peer
187        } else {
188            true // No previous requests, allow
189        }
190    }
191
192    /// Record a relay request for rate limiting
193    fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
194        self.rate_limiter.entry(peer_id).or_default().push_back(now);
195    }
196
197    /// Clean up old rate limiting entries
198    fn cleanup_rate_limiter(&mut self, now: Instant) {
199        self.rate_limiter.retain(|_, requests| {
200            requests.retain(|&request_time| {
201                now.saturating_duration_since(request_time) <= self.rate_limit_window
202            });
203            !requests.is_empty()
204        });
205    }
206
207    /// Get the next relay request that's ready to be processed
208    fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
209        // Find the first request that's ready to be retried
210        let mut expired_keys = Vec::new();
211        let mut ready_key = None;
212
213        for (seq, item) in &self.pending {
214            // Check if request has timed out
215            if now.saturating_duration_since(item.created_at) > self.request_timeout {
216                expired_keys.push(*seq);
217                continue;
218            }
219
220            // Check if it's ready for retry
221            if item.attempts == 0
222                || item
223                    .last_attempt
224                    .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
225            {
226                ready_key = Some(*seq);
227                break;
228            }
229        }
230
231        // Remove expired items
232        for key in expired_keys {
233            if let Some(expired) = self.pending.shift_remove(&key) {
234                debug!(
235                    "Relay request for peer {:?} timed out after {:?}",
236                    expired.target_peer_id,
237                    now.saturating_duration_since(expired.created_at)
238                );
239            }
240        }
241
242        // Return ready item if found
243        if let Some(key) = ready_key {
244            if let Some(mut item) = self.pending.shift_remove(&key) {
245                item.attempts += 1;
246                item.last_attempt = Some(now);
247                return Some(item);
248            }
249        }
250
251        None
252    }
253
254    /// Requeue a failed relay request if it hasn't exceeded max retries
255    fn requeue_failed(&mut self, item: RelayQueueItem) {
256        if item.attempts < self.max_retries {
257            trace!(
258                "Requeuing failed relay request for peer {:?}, attempt {}/{}",
259                item.target_peer_id, item.attempts, self.max_retries
260            );
261            let seq = self.next_seq;
262            self.next_seq += 1;
263            self.pending.insert(seq, item);
264        } else {
265            debug!(
266                "Dropping relay request for peer {:?} after {} failed attempts",
267                item.target_peer_id, item.attempts
268            );
269        }
270    }
271
272    /// Clean up expired requests and return number of items cleaned
273    fn cleanup_expired(&mut self, now: Instant) -> usize {
274        let initial_len = self.pending.len();
275
276        // Collect expired keys
277        let expired_keys: Vec<u64> = self
278            .pending
279            .iter()
280            .filter_map(|(seq, item)| {
281                if now.saturating_duration_since(item.created_at) > self.request_timeout {
282                    Some(*seq)
283                } else {
284                    None
285                }
286            })
287            .collect();
288
289        // Remove expired items
290        for key in expired_keys {
291            if let Some(expired) = self.pending.shift_remove(&key) {
292                debug!(
293                    "Removing expired relay request for peer {:?}",
294                    expired.target_peer_id
295                );
296            }
297        }
298
299        initial_len - self.pending.len()
300    }
301
302    /// Get current queue length
303    fn len(&self) -> usize {
304        self.pending.len()
305    }
306}
307
308/// The main entry point to the library
309///
310/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
311/// connection-generated events via `handle` and `handle_event`.
312pub struct Endpoint {
313    rng: StdRng,
314    index: ConnectionIndex,
315    connections: Slab<ConnectionMeta>,
316    local_cid_generator: Box<dyn ConnectionIdGenerator>,
317    config: Arc<EndpointConfig>,
318    server_config: Option<Arc<ServerConfig>>,
319    /// Whether the underlying UDP socket promises not to fragment packets
320    allow_mtud: bool,
321    /// Time at which a stateless reset was most recently sent
322    last_stateless_reset: Option<Instant>,
323    /// Buffered Initial and 0-RTT messages for pending incoming connections
324    incoming_buffers: Slab<IncomingBuffer>,
325    all_incoming_buffers_total_bytes: u64,
326    /// Mapping from peer IDs to connection handles for relay functionality
327    peer_connections: HashMap<PeerId, ConnectionHandle>,
328    /// Relay queue for bootstrap nodes
329    relay_queue: RelayQueue,
330    /// Relay statistics
331    relay_stats: RelayStats,
332    /// Comprehensive relay statistics collector
333    relay_stats_collector: RelayStatisticsCollector,
334    /// Whether address discovery is enabled (default: true)
335    address_discovery_enabled: bool,
336    /// Address change callback
337    address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
338}
339
340impl Endpoint {
341    /// Create a new endpoint
342    ///
343    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
344    /// better performance. This requires that outgoing packets are never fragmented, which can be
345    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
346    ///
347    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
348    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
349    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
350    /// using [`EndpointConfig::rng_seed`].
351    pub fn new(
352        config: Arc<EndpointConfig>,
353        server_config: Option<Arc<ServerConfig>>,
354        allow_mtud: bool,
355        rng_seed: Option<[u8; 32]>,
356    ) -> Self {
357        let rng_seed = rng_seed.or(config.rng_seed);
358        Self {
359            rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
360            index: ConnectionIndex::default(),
361            connections: Slab::new(),
362            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
363            config,
364            server_config,
365            allow_mtud,
366            last_stateless_reset: None,
367            incoming_buffers: Slab::new(),
368            all_incoming_buffers_total_bytes: 0,
369            peer_connections: HashMap::new(),
370            relay_queue: RelayQueue::new(),
371            relay_stats: RelayStats::default(),
372            relay_stats_collector: RelayStatisticsCollector::new(),
373            address_discovery_enabled: true, // Default to enabled
374            address_change_callback: None,
375        }
376    }
377
378    /// Replace the server configuration, affecting new incoming connections only
379    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
380        self.server_config = server_config;
381    }
382
383    /// Register a peer ID with a connection handle for relay functionality
384    pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
385        self.peer_connections.insert(peer_id, connection_handle);
386        trace!(
387            "Registered peer {:?} with connection {:?}",
388            peer_id, connection_handle
389        );
390    }
391
392    /// Unregister a peer ID from the connection mapping
393    pub fn unregister_peer(&mut self, peer_id: &PeerId) {
394        if let Some(handle) = self.peer_connections.remove(peer_id) {
395            trace!(
396                "Unregistered peer {:?} from connection {:?}",
397                peer_id, handle
398            );
399        }
400    }
401
402    /// Look up a connection handle for a given peer ID
403    pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
404        self.peer_connections.get(peer_id).copied()
405    }
406
407    /// Queue a frame for relay to a target peer
408    pub(crate) fn queue_frame_for_peer(
409        &mut self,
410        peer_id: &PeerId,
411        frame: frame::PunchMeNow,
412    ) -> bool {
413        self.relay_stats.requests_received += 1;
414
415        if let Some(ch) = self.lookup_peer_connection(peer_id) {
416            // Peer is currently connected, try to relay immediately
417            if self.relay_frame_to_connection(ch, frame.clone()) {
418                self.relay_stats.requests_relayed += 1;
419                // Record successful rate limiting decision
420                self.relay_stats_collector.record_rate_limit(true);
421                trace!(
422                    "Immediately relayed frame to peer {:?} via connection {:?}",
423                    peer_id, ch
424                );
425                return true;
426            }
427        }
428
429        // Peer not connected or immediate relay failed, queue for later
430        let now = Instant::now();
431        if self.relay_queue.enqueue(*peer_id, frame, now) {
432            self.relay_stats.current_queue_size = self.relay_queue.len();
433            // Record successful rate limiting decision
434            self.relay_stats_collector.record_rate_limit(true);
435            trace!("Queued relay request for peer {:?}", peer_id);
436            true
437        } else {
438            // Check if it was rate limited or queue full
439            if !self.relay_queue.check_rate_limit(*peer_id, now) {
440                self.relay_stats.requests_rate_limited += 1;
441                // Record rate limiting rejection
442                self.relay_stats_collector.record_rate_limit(false);
443                // Record error
444                self.relay_stats_collector.record_error("rate_limited");
445            } else {
446                self.relay_stats.requests_dropped += 1;
447                // Record error for queue full
448                self.relay_stats_collector
449                    .record_error("resource_exhausted");
450            }
451            false
452        }
453    }
454
455    /// Attempt to relay a frame to a specific connection
456    fn relay_frame_to_connection(
457        &mut self,
458        ch: ConnectionHandle,
459        frame: frame::PunchMeNow,
460    ) -> bool {
461        // Queue the PunchMeNow frame to the connection via a connection event
462        let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
463        if let Some(_conn) = self.connections.get_mut(ch.0) {
464            // We cannot call into the connection directly here; return an event to be handled by the
465            // caller's event loop. For immediate relay, we push it into the connection by returning a
466            // ConnectionEvent through the normal endpoint flow.
467            // As Endpoint::handle_event returns Option<ConnectionEvent>, we emulate that path here by
468            // enqueuing the event on the endpoint index for this connection.
469            // Use the same flow as datagram dispatch: construct and return via DatagramEvent::ConnectionEvent
470        }
471        // Fallback: indicate the caller should emit a ConnectionEvent for this handle
472        // Since this method is used internally in endpoint's event loop where we can return a
473        // ConnectionEvent, let the caller path handle it. Here, report success so the queue logic proceeds.
474        true
475    }
476
477    /// Set the peer ID for an existing connection
478    pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
479        if let Some(connection) = self.connections.get_mut(connection_handle.0) {
480            connection.peer_id = Some(peer_id);
481            self.register_peer(peer_id, connection_handle);
482
483            // Process any queued relay requests for this peer
484            self.process_queued_relays_for_peer(peer_id);
485        }
486    }
487
488    /// Process queued relay requests for a specific peer that just connected
489    fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
490        let _now = Instant::now();
491        let mut processed = 0;
492
493        // Collect items to process for this peer
494        let mut items_to_process = Vec::new();
495        let mut keys_to_remove = Vec::new();
496
497        // Find all items for this peer
498        for (seq, item) in &self.relay_queue.pending {
499            if item.target_peer_id == peer_id {
500                items_to_process.push(item.clone());
501                keys_to_remove.push(*seq);
502            }
503        }
504
505        // Remove items from queue
506        for key in keys_to_remove {
507            self.relay_queue.pending.shift_remove(&key);
508        }
509
510        // Process the items
511        for item in items_to_process {
512            if let Some(ch) = self.lookup_peer_connection(&peer_id) {
513                if self.relay_frame_to_connection(ch, item.frame.clone()) {
514                    self.relay_stats.requests_relayed += 1;
515                    processed += 1;
516                    trace!("Processed queued relay for peer {:?}", peer_id);
517                } else {
518                    // Failed to relay, requeue
519                    self.relay_queue.requeue_failed(item);
520                    self.relay_stats.requests_failed += 1;
521                }
522            }
523        }
524
525        self.relay_stats.current_queue_size = self.relay_queue.len();
526
527        if processed > 0 {
528            debug!(
529                "Processed {} queued relay requests for peer {:?}",
530                processed, peer_id
531            );
532        }
533    }
534
535    /// Process pending relay requests (should be called periodically)
536    pub fn process_relay_queue(&mut self) {
537        let now = Instant::now();
538        let mut processed = 0;
539        let mut failed = 0;
540
541        // Process ready relay requests
542        while let Some(item) = self.relay_queue.next_ready(now) {
543            if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
544                if self.relay_frame_to_connection(ch, item.frame.clone()) {
545                    self.relay_stats.requests_relayed += 1;
546                    processed += 1;
547                    trace!(
548                        "Successfully relayed frame to peer {:?}",
549                        item.target_peer_id
550                    );
551                } else {
552                    // Failed to relay, requeue for retry
553                    self.relay_queue.requeue_failed(item);
554                    self.relay_stats.requests_failed += 1;
555                    // Record connection failure error
556                    self.relay_stats_collector
557                        .record_error("connection_failure");
558                    failed += 1;
559                }
560            } else {
561                // Peer not connected, requeue for later
562                self.relay_queue.requeue_failed(item);
563                // Record peer not found error
564                self.relay_stats_collector.record_error("peer_not_found");
565                failed += 1;
566            }
567        }
568
569        // Clean up expired requests
570        let expired = self.relay_queue.cleanup_expired(now);
571        if expired > 0 {
572            self.relay_stats.requests_timed_out += expired as u64;
573            // Record timeout errors for each expired request
574            for _ in 0..expired {
575                self.relay_stats_collector.record_error("request_timeout");
576            }
577            debug!("Cleaned up {} expired relay requests", expired);
578        }
579
580        self.relay_stats.current_queue_size = self.relay_queue.len();
581
582        if processed > 0 || failed > 0 {
583            trace!(
584                "Relay queue processing: {} processed, {} failed, {} in queue",
585                processed,
586                failed,
587                self.relay_queue.len()
588            );
589        }
590    }
591
592    /// Get relay statistics for monitoring
593    pub fn relay_stats(&self) -> &RelayStats {
594        &self.relay_stats
595    }
596
597    /// Get comprehensive relay statistics for monitoring and analysis
598    pub fn comprehensive_relay_stats(&self) -> crate::relay::RelayStatistics {
599        // Update the collector with current queue stats before collecting
600        self.relay_stats_collector
601            .update_queue_stats(&self.relay_stats);
602        self.relay_stats_collector.collect_statistics()
603    }
604
605    /// Get relay statistics collector for external registration of components
606    pub fn relay_stats_collector(&self) -> &RelayStatisticsCollector {
607        &self.relay_stats_collector
608    }
609
610    /// Get relay queue length
611    pub fn relay_queue_len(&self) -> usize {
612        self.relay_queue.len()
613    }
614
615    /// Process `EndpointEvent`s emitted from related `Connection`s
616    ///
617    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
618    pub fn handle_event(
619        &mut self,
620        ch: ConnectionHandle,
621        event: EndpointEvent,
622    ) -> Option<ConnectionEvent> {
623        use EndpointEventInner::*;
624        match event.0 {
625            EndpointEventInner::NeedIdentifiers(now, n) => {
626                return Some(self.send_new_identifiers(now, ch, n));
627            }
628            ResetToken(remote, token) => {
629                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
630                    self.index.connection_reset_tokens.remove(old.0, old.1);
631                }
632                if self.index.connection_reset_tokens.insert(remote, token, ch) {
633                    warn!("duplicate reset token");
634                }
635            }
636            RetireConnectionId(now, seq, allow_more_cids) => {
637                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
638                    trace!("peer retired CID {}: {}", seq, cid);
639                    self.index.retire(cid);
640                    if allow_more_cids {
641                        return Some(self.send_new_identifiers(now, ch, 1));
642                    }
643                }
644            }
645            RelayPunchMeNow(target_peer_id, punch_me_now) => {
646                // Handle relay request from bootstrap node
647                let peer_id = PeerId(target_peer_id);
648                if self.queue_frame_for_peer(&peer_id, punch_me_now) {
649                    trace!(
650                        "Successfully queued PunchMeNow frame for relay to peer {:?}",
651                        peer_id
652                    );
653                } else {
654                    warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
655                }
656            }
657            SendAddressFrame(add_address_frame) => {
658                // Convert to a connection event so the connection queues the frame for transmit
659                return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
660                    add_address_frame,
661                )));
662            }
663            NatCandidateValidated { address, challenge } => {
664                // Handle successful NAT traversal candidate validation
665                trace!(
666                    "NAT candidate validation succeeded for {} with challenge {:016x}",
667                    address, challenge
668                );
669
670                // The validation success is primarily handled by the connection-level state machine
671                // This event serves as notification to the endpoint for potential coordination
672                // with other components or logging/metrics collection
673                debug!("NAT candidate {} validated successfully", address);
674            }
675            TryConnectTo {
676                request_id,
677                target_address,
678                timeout_ms,
679                requester_connection,
680                requested_at: _,
681            } => {
682                // Handle TryConnectTo request from a peer
683                // This is used for NAT callback testing - a peer asks us to try connecting
684                // to a target to verify connectivity
685                trace!(
686                    "TryConnectTo request received: request_id={}, target={}, timeout={}ms, from={}",
687                    request_id, target_address, timeout_ms, requester_connection
688                );
689
690                // Since the endpoint is synchronous and we can't spawn async tasks here,
691                // we'll queue a response. The actual connection attempt would need to be
692                // handled by the higher-level async runtime.
693                // For now, we queue a "not implemented" response to acknowledge the request.
694                debug!(
695                    "TryConnectTo: endpoint received callback request for {}",
696                    target_address
697                );
698
699                // TODO: In the async wrapper (high_level/mod.rs), implement the actual
700                // connection attempt and send back the TryConnectToResponse.
701                // For now, this event is acknowledged but not acted upon at the endpoint level.
702            }
703            Drained => {
704                if let Some(conn) = self.connections.try_remove(ch.0) {
705                    self.index.remove(&conn);
706                    // Clean up peer connection mapping if this connection has a peer ID
707                    if let Some(peer_id) = conn.peer_id {
708                        self.peer_connections.remove(&peer_id);
709                        trace!("Cleaned up peer connection mapping for {:?}", peer_id);
710                    }
711                } else {
712                    // This indicates a bug in downstream code, which could cause spurious
713                    // connection loss instead of this error if the CID was (re)allocated prior to
714                    // the illegal call.
715                    error!(id = ch.0, "unknown connection drained");
716                }
717            }
718        }
719        None
720    }
721
722    /// Process an incoming UDP datagram
723    pub fn handle(
724        &mut self,
725        now: Instant,
726        remote: SocketAddr,
727        local_ip: Option<IpAddr>,
728        ecn: Option<EcnCodepoint>,
729        data: BytesMut,
730        buf: &mut Vec<u8>,
731    ) -> Option<DatagramEvent> {
732        // Partially decode packet or short-circuit if unable
733        let datagram_len = data.len();
734        let event = match PartialDecode::new(
735            data,
736            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
737            &self.config.supported_versions,
738            self.config.grease_quic_bit,
739        ) {
740            Ok((first_decode, remaining)) => DatagramConnectionEvent {
741                now,
742                remote,
743                ecn,
744                first_decode,
745                remaining,
746            },
747            Err(PacketDecodeError::UnsupportedVersion {
748                src_cid,
749                dst_cid,
750                version,
751            }) => {
752                if self.server_config.is_none() {
753                    debug!("dropping packet with unsupported version");
754                    return None;
755                }
756                trace!("sending version negotiation");
757                // Negotiate versions
758                Header::VersionNegotiate {
759                    random: self.rng.r#gen::<u8>() | 0x40,
760                    src_cid: dst_cid,
761                    dst_cid: src_cid,
762                }
763                .encode(buf);
764                // Grease with a reserved version
765                buf.write::<u32>(match version {
766                    0x0a1a_2a3a => 0x0a1a_2a4a,
767                    _ => 0x0a1a_2a3a,
768                });
769                for &version in &self.config.supported_versions {
770                    buf.write(version);
771                }
772                return Some(DatagramEvent::Response(Transmit {
773                    destination: remote,
774                    ecn: None,
775                    size: buf.len(),
776                    segment_size: None,
777                    src_ip: local_ip,
778                }));
779            }
780            Err(e) => {
781                trace!("malformed header: {}", e);
782                return None;
783            }
784        };
785
786        let addresses = FourTuple { remote, local_ip };
787        let dst_cid = event.first_decode.dst_cid();
788
789        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
790            // Handle packet on existing connection
791            match route_to {
792                RouteDatagramTo::Incoming(incoming_idx) => {
793                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
794                    let Some(config) = &self.server_config else {
795                        debug!("no server config available to buffer incoming datagram");
796                        return None;
797                    };
798
799                    if incoming_buffer
800                        .total_bytes
801                        .checked_add(datagram_len as u64)
802                        .is_some_and(|n| n <= config.incoming_buffer_size)
803                        && self
804                            .all_incoming_buffers_total_bytes
805                            .checked_add(datagram_len as u64)
806                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
807                    {
808                        incoming_buffer.datagrams.push(event);
809                        incoming_buffer.total_bytes += datagram_len as u64;
810                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
811                    }
812
813                    None
814                }
815                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
816                    ch,
817                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
818                )),
819            }
820        } else if event.first_decode.initial_header().is_some() {
821            // Potentially create a new connection
822
823            self.handle_first_packet(datagram_len, event, addresses, buf)
824        } else if event.first_decode.has_long_header() {
825            debug!(
826                "ignoring non-initial packet for unknown connection {}",
827                dst_cid
828            );
829            None
830        } else if !event.first_decode.is_initial()
831            && self.local_cid_generator.validate(dst_cid).is_err()
832        {
833            // If we got this far, we're receiving a seemingly valid packet for an unknown
834            // connection. Send a stateless reset if possible.
835
836            debug!("dropping packet with invalid CID");
837            None
838        } else if dst_cid.is_empty() {
839            trace!("dropping unrecognized short packet without ID");
840            None
841        } else {
842            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
843                .map(DatagramEvent::Response)
844        }
845    }
846
847    fn stateless_reset(
848        &mut self,
849        now: Instant,
850        inciting_dgram_len: usize,
851        addresses: FourTuple,
852        dst_cid: ConnectionId,
853        buf: &mut Vec<u8>,
854    ) -> Option<Transmit> {
855        if self
856            .last_stateless_reset
857            .is_some_and(|last| last + self.config.min_reset_interval > now)
858        {
859            debug!("ignoring unexpected packet within minimum stateless reset interval");
860            return None;
861        }
862
863        /// Minimum amount of padding for the stateless reset to look like a short-header packet
864        const MIN_PADDING_LEN: usize = 5;
865
866        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
867        // smaller than the inciting packet.
868        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
869            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
870            _ => {
871                debug!(
872                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
873                    inciting_dgram_len
874                );
875                return None;
876            }
877        };
878
879        debug!(
880            "sending stateless reset for {} to {}",
881            dst_cid, addresses.remote
882        );
883        self.last_stateless_reset = Some(now);
884        // Resets with at least this much padding can't possibly be distinguished from real packets
885        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
886        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
887            max_padding_len
888        } else {
889            self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
890        };
891        buf.reserve(padding_len + RESET_TOKEN_SIZE);
892        buf.resize(padding_len, 0);
893        self.rng.fill_bytes(&mut buf[0..padding_len]);
894        buf[0] = 0b0100_0000 | (buf[0] >> 2);
895        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
896
897        debug_assert!(buf.len() < inciting_dgram_len);
898
899        Some(Transmit {
900            destination: addresses.remote,
901            ecn: None,
902            size: buf.len(),
903            segment_size: None,
904            src_ip: addresses.local_ip,
905        })
906    }
907
908    /// Initiate a connection
909    pub fn connect(
910        &mut self,
911        now: Instant,
912        config: ClientConfig,
913        remote: SocketAddr,
914        server_name: &str,
915    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
916        if self.cids_exhausted() {
917            return Err(ConnectError::CidsExhausted);
918        }
919        if remote.port() == 0 || remote.ip().is_unspecified() {
920            return Err(ConnectError::InvalidRemoteAddress(remote));
921        }
922        if !self.config.supported_versions.contains(&config.version) {
923            return Err(ConnectError::UnsupportedVersion);
924        }
925
926        let remote_id = (config.initial_dst_cid_provider)();
927        trace!(initial_dcid = %remote_id);
928
929        let ch = ConnectionHandle(self.connections.vacant_key());
930        let loc_cid = self.new_cid(ch);
931        let params = TransportParameters::new(
932            &config.transport,
933            &self.config,
934            self.local_cid_generator.as_ref(),
935            loc_cid,
936            None,
937            &mut self.rng,
938        )?;
939        let tls = config
940            .crypto
941            .start_session(config.version, server_name, &params)?;
942
943        let conn = self.add_connection(
944            ch,
945            config.version,
946            remote_id,
947            loc_cid,
948            remote_id,
949            FourTuple {
950                remote,
951                local_ip: None,
952            },
953            now,
954            tls,
955            config.transport,
956            SideArgs::Client {
957                token_store: config.token_store,
958                server_name: server_name.into(),
959            },
960        );
961        Ok((ch, conn))
962    }
963
964    fn send_new_identifiers(
965        &mut self,
966        now: Instant,
967        ch: ConnectionHandle,
968        num: u64,
969    ) -> ConnectionEvent {
970        let mut ids = vec![];
971        for _ in 0..num {
972            let id = self.new_cid(ch);
973            let meta = &mut self.connections[ch];
974            let sequence = meta.cids_issued;
975            meta.cids_issued += 1;
976            meta.loc_cids.insert(sequence, id);
977            ids.push(IssuedCid {
978                sequence,
979                id,
980                reset_token: ResetToken::new(&*self.config.reset_key, id),
981            });
982        }
983        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
984    }
985
986    /// Generate a connection ID for `ch`
987    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
988        loop {
989            let cid = self.local_cid_generator.generate_cid();
990            if cid.is_empty() {
991                // Zero-length CID; nothing to track
992                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
993                return cid;
994            }
995            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
996                e.insert(ch);
997                break cid;
998            }
999        }
1000    }
1001
1002    fn handle_first_packet(
1003        &mut self,
1004        datagram_len: usize,
1005        event: DatagramConnectionEvent,
1006        addresses: FourTuple,
1007        buf: &mut Vec<u8>,
1008    ) -> Option<DatagramEvent> {
1009        let dst_cid = event.first_decode.dst_cid();
1010        let Some(header) = event.first_decode.initial_header() else {
1011            debug!(
1012                "unable to extract initial header for connection {}",
1013                dst_cid
1014            );
1015            return None;
1016        };
1017
1018        let crypto = {
1019            let Some(server_config) = &self.server_config else {
1020                debug!("packet for unrecognized connection {}", dst_cid);
1021                return self
1022                    .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
1023                    .map(DatagramEvent::Response);
1024            };
1025            if datagram_len < MIN_INITIAL_SIZE as usize {
1026                debug!("ignoring short initial for connection {}", dst_cid);
1027                return None;
1028            }
1029            match server_config.crypto.initial_keys(header.version, dst_cid) {
1030                Ok(keys) => keys,
1031                Err(UnsupportedVersion) => {
1032                    debug!(
1033                        "ignoring initial packet version {:#x} unsupported by cryptographic layer",
1034                        header.version
1035                    );
1036                    return None;
1037                }
1038            }
1039        };
1040
1041        if let Err(reason) = self.early_validate_first_packet(header) {
1042            return Some(DatagramEvent::Response(self.initial_close(
1043                header.version,
1044                addresses,
1045                &crypto,
1046                &header.src_cid,
1047                reason,
1048                buf,
1049            )));
1050        }
1051
1052        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
1053            Ok(packet) => packet,
1054            Err(e) => {
1055                trace!("unable to decode initial packet: {}", e);
1056                return None;
1057            }
1058        };
1059
1060        if !packet.reserved_bits_valid() {
1061            debug!("dropping connection attempt with invalid reserved bits");
1062            return None;
1063        }
1064
1065        let Header::Initial(header) = packet.header else {
1066            debug!("unexpected non-initial packet in handle_first_packet()");
1067            return None;
1068        };
1069
1070        let token = match self.server_config.as_ref() {
1071            Some(sc) => match IncomingToken::from_header(&header, sc, addresses.remote) {
1072                Ok(token) => token,
1073                Err(InvalidRetryTokenError) => {
1074                    debug!("rejecting invalid retry token");
1075                    return Some(DatagramEvent::Response(self.initial_close(
1076                        header.version,
1077                        addresses,
1078                        &crypto,
1079                        &header.src_cid,
1080                        TransportError::INVALID_TOKEN(""),
1081                        buf,
1082                    )));
1083                }
1084            },
1085            None => {
1086                debug!("rejecting invalid retry token");
1087                return Some(DatagramEvent::Response(self.initial_close(
1088                    header.version,
1089                    addresses,
1090                    &crypto,
1091                    &header.src_cid,
1092                    TransportError::INVALID_TOKEN(""),
1093                    buf,
1094                )));
1095            }
1096        };
1097
1098        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1099        self.index
1100            .insert_initial_incoming(header.dst_cid, incoming_idx);
1101
1102        Some(DatagramEvent::NewConnection(Incoming {
1103            received_at: event.now,
1104            addresses,
1105            ecn: event.ecn,
1106            packet: InitialPacket {
1107                header,
1108                header_data: packet.header_data,
1109                payload: packet.payload,
1110            },
1111            rest: event.remaining,
1112            crypto,
1113            token,
1114            incoming_idx,
1115            improper_drop_warner: IncomingImproperDropWarner,
1116        }))
1117    }
1118
1119    /// Attempt to accept this incoming connection (an error may still occur)
1120    // AcceptError cannot be made smaller without semver breakage
1121    #[allow(clippy::result_large_err)]
1122    pub fn accept(
1123        &mut self,
1124        mut incoming: Incoming,
1125        now: Instant,
1126        buf: &mut Vec<u8>,
1127        server_config: Option<Arc<ServerConfig>>,
1128    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1129        let remote_address_validated = incoming.remote_address_validated();
1130        incoming.improper_drop_warner.dismiss();
1131        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1132        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1133
1134        let packet_number = incoming.packet.header.number.expand(0);
1135        let InitialHeader {
1136            src_cid,
1137            dst_cid,
1138            version,
1139            ..
1140        } = incoming.packet.header;
1141        let server_config = match server_config.or_else(|| self.server_config.clone()) {
1142            Some(sc) => sc,
1143            None => {
1144                return Err(AcceptError {
1145                    cause: ConnectionError::TransportError(
1146                        crate::transport_error::Error::INTERNAL_ERROR(""),
1147                    ),
1148                    response: None,
1149                });
1150            }
1151        };
1152
1153        if server_config
1154            .transport
1155            .max_idle_timeout
1156            .is_some_and(|timeout| {
1157                incoming.received_at + Duration::from_millis(timeout.into()) <= now
1158            })
1159        {
1160            debug!("abandoning accept of stale initial");
1161            self.index.remove_initial(dst_cid);
1162            return Err(AcceptError {
1163                cause: ConnectionError::TimedOut,
1164                response: None,
1165            });
1166        }
1167
1168        if self.cids_exhausted() {
1169            debug!("refusing connection");
1170            self.index.remove_initial(dst_cid);
1171            return Err(AcceptError {
1172                cause: ConnectionError::CidsExhausted,
1173                response: Some(self.initial_close(
1174                    version,
1175                    incoming.addresses,
1176                    &incoming.crypto,
1177                    &src_cid,
1178                    TransportError::CONNECTION_REFUSED(""),
1179                    buf,
1180                )),
1181            });
1182        }
1183
1184        if incoming
1185            .crypto
1186            .packet
1187            .remote
1188            .decrypt(
1189                packet_number,
1190                &incoming.packet.header_data,
1191                &mut incoming.packet.payload,
1192            )
1193            .is_err()
1194        {
1195            debug!(packet_number, "failed to authenticate initial packet");
1196            self.index.remove_initial(dst_cid);
1197            return Err(AcceptError {
1198                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1199                response: None,
1200            });
1201        };
1202
1203        let ch = ConnectionHandle(self.connections.vacant_key());
1204        let loc_cid = self.new_cid(ch);
1205        let mut params = TransportParameters::new(
1206            &server_config.transport,
1207            &self.config,
1208            self.local_cid_generator.as_ref(),
1209            loc_cid,
1210            Some(&server_config),
1211            &mut self.rng,
1212        )?;
1213        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1214        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1215        params.retry_src_cid = incoming.token.retry_src_cid;
1216        let mut pref_addr_cid = None;
1217        if server_config.has_preferred_address() {
1218            let cid = self.new_cid(ch);
1219            pref_addr_cid = Some(cid);
1220            params.preferred_address = Some(PreferredAddress {
1221                address_v4: server_config.preferred_address_v4,
1222                address_v6: server_config.preferred_address_v6,
1223                connection_id: cid,
1224                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1225            });
1226        }
1227
1228        let tls = server_config.crypto.clone().start_session(version, &params);
1229        let transport_config = server_config.transport.clone();
1230        let mut conn = self.add_connection(
1231            ch,
1232            version,
1233            dst_cid,
1234            loc_cid,
1235            src_cid,
1236            incoming.addresses,
1237            incoming.received_at,
1238            tls,
1239            transport_config,
1240            SideArgs::Server {
1241                server_config,
1242                pref_addr_cid,
1243                path_validated: remote_address_validated,
1244            },
1245        );
1246        self.index.insert_initial(dst_cid, ch);
1247
1248        match conn.handle_first_packet(
1249            incoming.received_at,
1250            incoming.addresses.remote,
1251            incoming.ecn,
1252            packet_number,
1253            incoming.packet,
1254            incoming.rest,
1255        ) {
1256            Ok(()) => {
1257                trace!(id = ch.0, icid = %dst_cid, "new connection");
1258
1259                for event in incoming_buffer.datagrams {
1260                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1261                }
1262
1263                Ok((ch, conn))
1264            }
1265            Err(e) => {
1266                debug!("handshake failed: {}", e);
1267                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1268                let response = match e {
1269                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
1270                        version,
1271                        incoming.addresses,
1272                        &incoming.crypto,
1273                        &src_cid,
1274                        e.clone(),
1275                        buf,
1276                    )),
1277                    _ => None,
1278                };
1279                Err(AcceptError { cause: e, response })
1280            }
1281        }
1282    }
1283
1284    /// Check if we should refuse a connection attempt regardless of the packet's contents
1285    fn early_validate_first_packet(
1286        &mut self,
1287        header: &ProtectedInitialHeader,
1288    ) -> Result<(), TransportError> {
1289        let Some(config) = &self.server_config else {
1290            return Err(TransportError::INTERNAL_ERROR(""));
1291        };
1292        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1293            return Err(TransportError::CONNECTION_REFUSED(""));
1294        }
1295
1296        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
1297        // bytes. If this is a Retry packet, then the length must instead match our usual CID
1298        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
1299        // also need to validate CID length for those after decoding the token.
1300        if header.dst_cid.len() < 8
1301            && (header.token_pos.is_empty()
1302                || header.dst_cid.len() != self.local_cid_generator.cid_len())
1303        {
1304            debug!(
1305                "rejecting connection due to invalid DCID length {}",
1306                header.dst_cid.len()
1307            );
1308            return Err(TransportError::PROTOCOL_VIOLATION(
1309                "invalid destination CID length",
1310            ));
1311        }
1312
1313        Ok(())
1314    }
1315
1316    /// Reject this incoming connection attempt
1317    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1318        self.clean_up_incoming(&incoming);
1319        incoming.improper_drop_warner.dismiss();
1320
1321        self.initial_close(
1322            incoming.packet.header.version,
1323            incoming.addresses,
1324            &incoming.crypto,
1325            &incoming.packet.header.src_cid,
1326            TransportError::CONNECTION_REFUSED(""),
1327            buf,
1328        )
1329    }
1330
1331    /// Respond with a retry packet, requiring the client to retry with address validation
1332    ///
1333    /// Errors if `incoming.may_retry()` is false.
1334    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1335        if !incoming.may_retry() {
1336            return Err(RetryError(Box::new(incoming)));
1337        }
1338
1339        let Some(server_config_arc) = self.server_config.clone() else {
1340            return Err(RetryError(Box::new(incoming)));
1341        };
1342
1343        self.clean_up_incoming(&incoming);
1344        incoming.improper_drop_warner.dismiss();
1345
1346        // First Initial
1347        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
1348        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
1349        // with established connections. In the unlikely event that a collision occurs
1350        // between two connections in the initial phase, both will fail fast and may be
1351        // retried by the application layer.
1352        let loc_cid = self.local_cid_generator.generate_cid();
1353
1354        let payload = TokenPayload::Retry {
1355            address: incoming.addresses.remote,
1356            orig_dst_cid: incoming.packet.header.dst_cid,
1357            issued: server_config_arc.time_source.now(),
1358        };
1359        let token = Token::new(payload, &mut self.rng).encode(&*server_config_arc.token_key);
1360
1361        let header = Header::Retry {
1362            src_cid: loc_cid,
1363            dst_cid: incoming.packet.header.src_cid,
1364            version: incoming.packet.header.version,
1365        };
1366
1367        let encode = header.encode(buf);
1368        buf.put_slice(&token);
1369        buf.extend_from_slice(&server_config_arc.crypto.retry_tag(
1370            incoming.packet.header.version,
1371            &incoming.packet.header.dst_cid,
1372            buf,
1373        ));
1374        encode.finish(buf, &*incoming.crypto.header.local, None);
1375
1376        Ok(Transmit {
1377            destination: incoming.addresses.remote,
1378            ecn: None,
1379            size: buf.len(),
1380            segment_size: None,
1381            src_ip: incoming.addresses.local_ip,
1382        })
1383    }
1384
1385    /// Ignore this incoming connection attempt, not sending any packet in response
1386    ///
1387    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
1388    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
1389    pub fn ignore(&mut self, incoming: Incoming) {
1390        self.clean_up_incoming(&incoming);
1391        incoming.improper_drop_warner.dismiss();
1392    }
1393
1394    /// Clean up endpoint data structures associated with an `Incoming`.
1395    fn clean_up_incoming(&mut self, incoming: &Incoming) {
1396        self.index.remove_initial(incoming.packet.header.dst_cid);
1397        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1398        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1399    }
1400
1401    fn add_connection(
1402        &mut self,
1403        ch: ConnectionHandle,
1404        version: u32,
1405        init_cid: ConnectionId,
1406        loc_cid: ConnectionId,
1407        rem_cid: ConnectionId,
1408        addresses: FourTuple,
1409        now: Instant,
1410        tls: Box<dyn crypto::Session>,
1411        transport_config: Arc<TransportConfig>,
1412        side_args: SideArgs,
1413    ) -> Connection {
1414        let mut rng_seed = [0; 32];
1415        self.rng.fill_bytes(&mut rng_seed);
1416        let side = side_args.side();
1417        let pref_addr_cid = side_args.pref_addr_cid();
1418        let conn = Connection::new(
1419            self.config.clone(),
1420            transport_config,
1421            init_cid,
1422            loc_cid,
1423            rem_cid,
1424            addresses.remote,
1425            addresses.local_ip,
1426            tls,
1427            self.local_cid_generator.as_ref(),
1428            now,
1429            version,
1430            self.allow_mtud,
1431            rng_seed,
1432            side_args,
1433        );
1434
1435        let mut cids_issued = 0;
1436        let mut loc_cids = FxHashMap::default();
1437
1438        loc_cids.insert(cids_issued, loc_cid);
1439        cids_issued += 1;
1440
1441        if let Some(cid) = pref_addr_cid {
1442            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1443            loc_cids.insert(cids_issued, cid);
1444            cids_issued += 1;
1445        }
1446
1447        let id = self.connections.insert(ConnectionMeta {
1448            init_cid,
1449            cids_issued,
1450            loc_cids,
1451            addresses,
1452            side,
1453            reset_token: None,
1454            peer_id: None,
1455        });
1456        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1457
1458        self.index.insert_conn(addresses, loc_cid, ch, side);
1459
1460        conn
1461    }
1462
1463    fn initial_close(
1464        &mut self,
1465        version: u32,
1466        addresses: FourTuple,
1467        crypto: &Keys,
1468        remote_id: &ConnectionId,
1469        reason: TransportError,
1470        buf: &mut Vec<u8>,
1471    ) -> Transmit {
1472        // We don't need to worry about CID collisions in initial closes because the peer
1473        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
1474        // unexpected response.
1475        let local_id = self.local_cid_generator.generate_cid();
1476        let number = PacketNumber::U8(0);
1477        let header = Header::Initial(InitialHeader {
1478            dst_cid: *remote_id,
1479            src_cid: local_id,
1480            number,
1481            token: Bytes::new(),
1482            version,
1483        });
1484
1485        let partial_encode = header.encode(buf);
1486        let max_len =
1487            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1488        frame::Close::from(reason).encode(buf, max_len);
1489        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1490        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1491        Transmit {
1492            destination: addresses.remote,
1493            ecn: None,
1494            size: buf.len(),
1495            segment_size: None,
1496            src_ip: addresses.local_ip,
1497        }
1498    }
1499
1500    /// Access the configuration used by this endpoint
1501    pub fn config(&self) -> &EndpointConfig {
1502        &self.config
1503    }
1504
1505    /// Enable or disable address discovery for this endpoint
1506    ///
1507    /// Address discovery is enabled by default. When enabled, the endpoint will:
1508    /// - Send OBSERVED_ADDRESS frames to peers to inform them of their reflexive addresses
1509    /// - Process received OBSERVED_ADDRESS frames to learn about its own reflexive addresses
1510    /// - Integrate discovered addresses with NAT traversal for improved connectivity
1511    pub fn enable_address_discovery(&mut self, enabled: bool) {
1512        self.address_discovery_enabled = enabled;
1513        // Note: Existing connections will continue with their current setting.
1514        // New connections will use the updated setting.
1515    }
1516
1517    /// Check if address discovery is enabled
1518    pub fn address_discovery_enabled(&self) -> bool {
1519        self.address_discovery_enabled
1520    }
1521
1522    /// Get all discovered addresses across all connections
1523    ///
1524    /// Returns a list of unique socket addresses that have been observed
1525    /// by remote peers and reported via OBSERVED_ADDRESS frames.
1526    ///
1527    /// Note: This returns an empty vector in the current implementation.
1528    /// Applications should track discovered addresses at the connection level.
1529    pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1530        // TODO: Implement address tracking at the endpoint level
1531        Vec::new()
1532    }
1533
1534    /// Set a callback to be invoked when an address change is detected
1535    ///
1536    /// The callback receives the old address (if any) and the new address.
1537    /// Only one callback can be set at a time; setting a new callback replaces the previous one.
1538    pub fn set_address_change_callback<F>(&mut self, callback: F)
1539    where
1540        F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1541    {
1542        self.address_change_callback = Some(Box::new(callback));
1543    }
1544
1545    /// Clear the address change callback
1546    pub fn clear_address_change_callback(&mut self) {
1547        self.address_change_callback = None;
1548    }
1549
1550    /// Get address discovery statistics
1551    ///
1552    /// Note: This returns default statistics in the current implementation.
1553    /// Applications should track statistics at the connection level.
1554    pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1555        // TODO: Implement statistics tracking at the endpoint level
1556        AddressDiscoveryStats::default()
1557    }
1558
1559    /// Number of connections that are currently open
1560    pub fn open_connections(&self) -> usize {
1561        self.connections.len()
1562    }
1563
1564    /// Counter for the number of bytes currently used
1565    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
1566    pub fn incoming_buffer_bytes(&self) -> u64 {
1567        self.all_incoming_buffers_total_bytes
1568    }
1569
1570    #[cfg(test)]
1571    #[allow(dead_code)]
1572    pub(crate) fn known_connections(&self) -> usize {
1573        let x = self.connections.len();
1574        debug_assert_eq!(x, self.index.connection_ids_initial.len());
1575        // Not all connections have known reset tokens
1576        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1577        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
1578        debug_assert!(x >= self.index.incoming_connection_remotes.len());
1579        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1580        x
1581    }
1582
1583    #[cfg(test)]
1584    #[allow(dead_code)]
1585    pub(crate) fn known_cids(&self) -> usize {
1586        self.index.connection_ids.len()
1587    }
1588
1589    /// Whether we've used up 3/4 of the available CID space
1590    ///
1591    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
1592    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
1593    fn cids_exhausted(&self) -> bool {
1594        self.local_cid_generator.cid_len() <= 4
1595            && self.local_cid_generator.cid_len() != 0
1596            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1597                - self.index.connection_ids.len())
1598                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1599    }
1600}
1601
1602impl fmt::Debug for Endpoint {
1603    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1604        fmt.debug_struct("Endpoint")
1605            .field("rng", &self.rng)
1606            .field("index", &self.index)
1607            .field("connections", &self.connections)
1608            .field("config", &self.config)
1609            .field("server_config", &self.server_config)
1610            // incoming_buffers too large
1611            .field("incoming_buffers.len", &self.incoming_buffers.len())
1612            .field(
1613                "all_incoming_buffers_total_bytes",
1614                &self.all_incoming_buffers_total_bytes,
1615            )
1616            .finish()
1617    }
1618}
1619
1620/// Buffered Initial and 0-RTT messages for a pending incoming connection
1621#[derive(Default)]
1622struct IncomingBuffer {
1623    datagrams: Vec<DatagramConnectionEvent>,
1624    total_bytes: u64,
1625}
1626
1627/// Part of protocol state incoming datagrams can be routed to
1628#[derive(Copy, Clone, Debug)]
1629enum RouteDatagramTo {
1630    Incoming(usize),
1631    Connection(ConnectionHandle),
1632}
1633
1634/// Maps packets to existing connections
1635#[derive(Default, Debug)]
1636struct ConnectionIndex {
1637    /// Identifies connections based on the initial DCID the peer utilized
1638    ///
1639    /// Uses a standard `HashMap` to protect against hash collision attacks.
1640    ///
1641    /// Used by the server, not the client.
1642    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1643    /// Identifies connections based on locally created CIDs
1644    ///
1645    /// Uses a cheaper hash function since keys are locally created
1646    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1647    /// Identifies incoming connections with zero-length CIDs
1648    ///
1649    /// Uses a standard `HashMap` to protect against hash collision attacks.
1650    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1651    /// Identifies outgoing connections with zero-length CIDs
1652    ///
1653    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1654    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1655    /// may be established per remote. We must omit the local address from the key because we don't
1656    /// necessarily know what address we're sending from, and hence receiving at.
1657    ///
1658    /// Uses a standard `HashMap` to protect against hash collision attacks.
1659    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1660    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1661    ///
1662    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1663    /// recipient, if any.
1664    connection_reset_tokens: ResetTokenTable,
1665}
1666
1667impl ConnectionIndex {
1668    /// Associate an incoming connection with its initial destination CID
1669    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1670        if dst_cid.is_empty() {
1671            return;
1672        }
1673        self.connection_ids_initial
1674            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1675    }
1676
1677    /// Remove an association with an initial destination CID
1678    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1679        if dst_cid.is_empty() {
1680            return;
1681        }
1682        let removed = self.connection_ids_initial.remove(&dst_cid);
1683        debug_assert!(removed.is_some());
1684    }
1685
1686    /// Associate a connection with its initial destination CID
1687    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1688        if dst_cid.is_empty() {
1689            return;
1690        }
1691        self.connection_ids_initial
1692            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1693    }
1694
1695    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1696    /// its current 4-tuple
1697    fn insert_conn(
1698        &mut self,
1699        addresses: FourTuple,
1700        dst_cid: ConnectionId,
1701        connection: ConnectionHandle,
1702        side: Side,
1703    ) {
1704        match dst_cid.len() {
1705            0 => match side {
1706                Side::Server => {
1707                    self.incoming_connection_remotes
1708                        .insert(addresses, connection);
1709                }
1710                Side::Client => {
1711                    self.outgoing_connection_remotes
1712                        .insert(addresses.remote, connection);
1713                }
1714            },
1715            _ => {
1716                self.connection_ids.insert(dst_cid, connection);
1717            }
1718        }
1719    }
1720
1721    /// Discard a connection ID
1722    fn retire(&mut self, dst_cid: ConnectionId) {
1723        self.connection_ids.remove(&dst_cid);
1724    }
1725
1726    /// Remove all references to a connection
1727    fn remove(&mut self, conn: &ConnectionMeta) {
1728        if conn.side.is_server() {
1729            self.remove_initial(conn.init_cid);
1730        }
1731        for cid in conn.loc_cids.values() {
1732            self.connection_ids.remove(cid);
1733        }
1734        self.incoming_connection_remotes.remove(&conn.addresses);
1735        self.outgoing_connection_remotes
1736            .remove(&conn.addresses.remote);
1737        if let Some((remote, token)) = conn.reset_token {
1738            self.connection_reset_tokens.remove(remote, token);
1739        }
1740    }
1741
1742    /// Find the existing connection that `datagram` should be routed to, if any
1743    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1744        let dst_cid = datagram.dst_cid();
1745        let is_empty_cid = dst_cid.is_empty();
1746
1747        // Fast path: Try most common lookup first (non-empty CID)
1748        if !is_empty_cid {
1749            if let Some(&ch) = self.connection_ids.get(dst_cid) {
1750                return Some(RouteDatagramTo::Connection(ch));
1751            }
1752        }
1753
1754        // Initial/0RTT packet lookup
1755        if datagram.is_initial() || datagram.is_0rtt() {
1756            if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1757                return Some(ch);
1758            }
1759        }
1760
1761        // Empty CID lookup (less common, do after fast path)
1762        if is_empty_cid {
1763            // Check incoming connections first (servers handle more incoming)
1764            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1765                return Some(RouteDatagramTo::Connection(ch));
1766            }
1767            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1768                return Some(RouteDatagramTo::Connection(ch));
1769            }
1770        }
1771
1772        // Stateless reset token lookup (least common, do last)
1773        let data = datagram.data();
1774        if data.len() < RESET_TOKEN_SIZE {
1775            return None;
1776        }
1777        self.connection_reset_tokens
1778            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1779            .cloned()
1780            .map(RouteDatagramTo::Connection)
1781    }
1782}
1783
1784#[derive(Debug)]
1785pub(crate) struct ConnectionMeta {
1786    init_cid: ConnectionId,
1787    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1788    cids_issued: u64,
1789    loc_cids: FxHashMap<u64, ConnectionId>,
1790    /// Remote/local addresses the connection began with
1791    ///
1792    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1793    /// bother keeping it up to date.
1794    addresses: FourTuple,
1795    side: Side,
1796    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1797    /// being sent to
1798    reset_token: Option<(SocketAddr, ResetToken)>,
1799    /// Peer ID for this connection, used for relay functionality
1800    peer_id: Option<PeerId>,
1801}
1802
1803/// Internal identifier for a `Connection` currently associated with an endpoint
1804#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1805pub struct ConnectionHandle(pub usize);
1806
1807impl From<ConnectionHandle> for usize {
1808    fn from(x: ConnectionHandle) -> Self {
1809        x.0
1810    }
1811}
1812
1813impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1814    type Output = ConnectionMeta;
1815    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1816        &self[ch.0]
1817    }
1818}
1819
1820impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1821    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1822        &mut self[ch.0]
1823    }
1824}
1825
1826/// Event resulting from processing a single datagram
1827pub enum DatagramEvent {
1828    /// The datagram is redirected to its `Connection`
1829    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1830    /// The datagram may result in starting a new `Connection`
1831    NewConnection(Incoming),
1832    /// Response generated directly by the endpoint
1833    Response(Transmit),
1834}
1835
1836/// An incoming connection for which the server has not yet begun its part of the handshake.
1837pub struct Incoming {
1838    received_at: Instant,
1839    addresses: FourTuple,
1840    ecn: Option<EcnCodepoint>,
1841    packet: InitialPacket,
1842    rest: Option<BytesMut>,
1843    crypto: Keys,
1844    token: IncomingToken,
1845    incoming_idx: usize,
1846    improper_drop_warner: IncomingImproperDropWarner,
1847}
1848
1849impl Incoming {
1850    /// The local IP address which was used when the peer established the connection
1851    ///
1852    /// This has the same behavior as [`Connection::local_ip`].
1853    pub fn local_ip(&self) -> Option<IpAddr> {
1854        self.addresses.local_ip
1855    }
1856
1857    /// The peer's UDP address
1858    pub fn remote_address(&self) -> SocketAddr {
1859        self.addresses.remote
1860    }
1861
1862    /// Whether the socket address that is initiating this connection has been validated
1863    ///
1864    /// This means that the sender of the initial packet has proved that they can receive traffic
1865    /// sent to `self.remote_address()`.
1866    ///
1867    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1868    /// The inverse is not guaranteed.
1869    pub fn remote_address_validated(&self) -> bool {
1870        self.token.validated
1871    }
1872
1873    /// Whether it is legal to respond with a retry packet
1874    ///
1875    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1876    /// The inverse is not guaranteed.
1877    pub fn may_retry(&self) -> bool {
1878        self.token.retry_src_cid.is_none()
1879    }
1880
1881    /// The original destination connection ID sent by the client
1882    pub fn orig_dst_cid(&self) -> &ConnectionId {
1883        &self.token.orig_dst_cid
1884    }
1885}
1886
1887impl fmt::Debug for Incoming {
1888    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1889        f.debug_struct("Incoming")
1890            .field("addresses", &self.addresses)
1891            .field("ecn", &self.ecn)
1892            // packet doesn't implement debug
1893            // rest is too big and not meaningful enough
1894            .field("token", &self.token)
1895            .field("incoming_idx", &self.incoming_idx)
1896            // improper drop warner contains no information
1897            .finish_non_exhaustive()
1898    }
1899}
1900
1901struct IncomingImproperDropWarner;
1902
1903impl IncomingImproperDropWarner {
1904    fn dismiss(self) {
1905        mem::forget(self);
1906    }
1907}
1908
1909impl Drop for IncomingImproperDropWarner {
1910    fn drop(&mut self) {
1911        warn!(
1912            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1913               (may cause memory leak and eventual inability to accept new connections)"
1914        );
1915    }
1916}
1917
1918/// Errors in the parameters being used to create a new connection
1919///
1920/// These arise before any I/O has been performed.
1921#[derive(Debug, Error, Clone, PartialEq, Eq)]
1922pub enum ConnectError {
1923    /// The endpoint can no longer create new connections
1924    ///
1925    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1926    #[error("endpoint stopping")]
1927    EndpointStopping,
1928    /// The connection could not be created because not enough of the CID space is available
1929    ///
1930    /// Try using longer connection IDs
1931    #[error("CIDs exhausted")]
1932    CidsExhausted,
1933    /// The given server name was malformed
1934    #[error("invalid server name: {0}")]
1935    InvalidServerName(String),
1936    /// The remote [`SocketAddr`] supplied was malformed
1937    ///
1938    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1939    #[error("invalid remote address: {0}")]
1940    InvalidRemoteAddress(SocketAddr),
1941    /// No default client configuration was set up
1942    ///
1943    /// Use `Endpoint::connect_with` to specify a client configuration.
1944    #[error("no default client config")]
1945    NoDefaultClientConfig,
1946    /// The local endpoint does not support the QUIC version specified in the client configuration
1947    #[error("unsupported QUIC version")]
1948    UnsupportedVersion,
1949    /// A TLS-related error occurred during connection establishment
1950    #[error("TLS error: {0}")]
1951    TlsError(String),
1952}
1953
1954/// Error type for attempting to accept an [`Incoming`]
1955#[derive(Debug)]
1956pub struct AcceptError {
1957    /// Underlying error describing reason for failure
1958    pub cause: ConnectionError,
1959    /// Optional response to transmit back
1960    pub response: Option<Transmit>,
1961}
1962
1963impl From<rustls::Error> for ConnectError {
1964    fn from(error: rustls::Error) -> Self {
1965        ConnectError::TlsError(error.to_string())
1966    }
1967}
1968
1969impl From<crate::transport_error::Error> for AcceptError {
1970    fn from(error: crate::transport_error::Error) -> Self {
1971        Self {
1972            cause: ConnectionError::TransportError(error),
1973            response: None,
1974        }
1975    }
1976}
1977
1978/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1979#[derive(Debug, Error)]
1980#[error("retry() with validated Incoming")]
1981pub struct RetryError(Box<Incoming>);
1982
1983impl RetryError {
1984    /// Get the [`Incoming`]
1985    pub fn into_incoming(self) -> Incoming {
1986        *self.0
1987    }
1988}
1989
1990/// Reset Tokens which are associated with peer socket addresses
1991///
1992/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1993/// peer generated and might be usable for hash collision attacks.
1994#[derive(Default, Debug)]
1995struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1996
1997impl ResetTokenTable {
1998    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1999        self.0
2000            .entry(remote)
2001            .or_default()
2002            .insert(token, ch)
2003            .is_some()
2004    }
2005
2006    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
2007        use std::collections::hash_map::Entry;
2008        match self.0.entry(remote) {
2009            Entry::Vacant(_) => {}
2010            Entry::Occupied(mut e) => {
2011                e.get_mut().remove(&token);
2012                if e.get().is_empty() {
2013                    e.remove_entry();
2014                }
2015            }
2016        }
2017    }
2018
2019    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
2020        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
2021        self.0.get(&remote)?.get(&token)
2022    }
2023}
2024
2025/// Identifies a connection by the combination of remote and local addresses
2026///
2027/// Including the local ensures good behavior when the host has multiple IP addresses on the same
2028/// subnet and zero-length connection IDs are in use.
2029#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
2030struct FourTuple {
2031    remote: SocketAddr,
2032    // A single socket can only listen on a single port, so no need to store it explicitly
2033    local_ip: Option<IpAddr>,
2034}