ant_quic/
endpoint.rs

1use std::{
2    collections::{HashMap, hash_map, VecDeque},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21    Side, Transmit, TransportConfig, TransportError,
22    cid_generator::ConnectionIdGenerator,
23    coding::BufMutExt,
24    config::{ClientConfig, EndpointConfig, ServerConfig},
25    connection::{Connection, ConnectionError, SideArgs},
26    crypto::{self, Keys, UnsupportedVersion},
27    frame,
28    nat_traversal_api::PeerId,
29    packet::{
30        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31        PacketNumber, PartialDecode, ProtectedInitialHeader,
32    },
33    shared::{
34        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35        EndpointEvent, EndpointEventInner, IssuedCid,
36    },
37    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38    transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41/// A queued relay request for bootstrap nodes
42#[derive(Debug, Clone)]
43struct RelayQueueItem {
44    /// Target peer ID for the relay
45    target_peer_id: PeerId,
46    /// Frame to be relayed
47    frame: frame::PunchMeNow,
48    /// When this relay request was created
49    created_at: Instant,
50    /// Number of relay attempts made
51    attempts: u32,
52    /// Last attempt time
53    last_attempt: Option<Instant>,
54}
55
56/// Relay queue management for bootstrap nodes
57#[derive(Debug)]
58struct RelayQueue {
59    /// Pending relay requests with insertion order and O(1) access
60    pending: IndexMap<u64, RelayQueueItem>,
61    /// Next sequence number for insertion order
62    next_seq: u64,
63    /// Maximum queue size to prevent memory exhaustion
64    max_queue_size: usize,
65    /// Timeout for relay requests
66    request_timeout: Duration,
67    /// Maximum retry attempts per request
68    max_retries: u32,
69    /// Minimum interval between retry attempts
70    retry_interval: Duration,
71    /// Rate limiting: track recent relay requests per peer
72    rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73    /// Maximum relays per peer per time window
74    max_relays_per_peer: usize,
75    /// Rate limiting time window
76    rate_limit_window: Duration,
77}
78
79/// Relay statistics for monitoring and debugging
80#[derive(Debug, Default)]
81pub struct RelayStats {
82    /// Total relay requests received
83    requests_received: u64,
84    /// Successfully relayed requests
85    requests_relayed: u64,
86    /// Failed relay requests (peer not found)
87    requests_failed: u64,
88    /// Requests dropped due to queue full
89    requests_dropped: u64,
90    /// Requests timed out
91    requests_timed_out: u64,
92    /// Requests dropped due to rate limiting
93    requests_rate_limited: u64,
94    /// Current queue size
95    current_queue_size: usize,
96}
97
98impl RelayQueue {
99    /// Create a new relay queue with default settings
100    fn new() -> Self {
101        Self {
102            pending: IndexMap::new(),
103            next_seq: 0,
104            max_queue_size: 1000, // Reasonable default
105            request_timeout: Duration::from_secs(30), // 30 second timeout
106            max_retries: 3,
107            retry_interval: Duration::from_millis(500), // 500ms between retries
108            rate_limiter: HashMap::new(),
109            max_relays_per_peer: 10, // Max 10 relays per peer per time window
110            rate_limit_window: Duration::from_secs(60), // 1 minute window
111        }
112    }
113
114    /// Add a relay request to the queue
115    fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
116        // Check queue size limit
117        if self.pending.len() >= self.max_queue_size {
118            warn!("Relay queue full, dropping request for peer {:?}", target_peer_id);
119            return false;
120        }
121
122        // Check rate limit for this peer
123        if !self.check_rate_limit(target_peer_id, now) {
124            warn!("Rate limit exceeded for peer {:?}, dropping relay request", target_peer_id);
125            return false;
126        }
127
128        let item = RelayQueueItem {
129            target_peer_id,
130            frame,
131            created_at: now,
132            attempts: 0,
133            last_attempt: None,
134        };
135
136        let seq = self.next_seq;
137        self.next_seq += 1;
138        self.pending.insert(seq, item);
139        
140        // Record this request for rate limiting
141        self.record_relay_request(target_peer_id, now);
142        
143        trace!("Queued relay request for peer {:?}, queue size: {}", target_peer_id, self.pending.len());
144        true
145    }
146
147    /// Check if a relay request is within rate limits
148    fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
149        // Clean up old entries first
150        self.cleanup_rate_limiter(now);
151        
152        // Check current request count for this peer
153        if let Some(requests) = self.rate_limiter.get(&peer_id) {
154            requests.len() < self.max_relays_per_peer
155        } else {
156            true // No previous requests, allow
157        }
158    }
159
160    /// Record a relay request for rate limiting
161    fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
162        self.rate_limiter.entry(peer_id).or_insert_with(VecDeque::new).push_back(now);
163    }
164
165    /// Clean up old rate limiting entries
166    fn cleanup_rate_limiter(&mut self, now: Instant) {
167        self.rate_limiter.retain(|_, requests| {
168            requests.retain(|&request_time| now.saturating_duration_since(request_time) <= self.rate_limit_window);
169            !requests.is_empty()
170        });
171    }
172
173    /// Get the next relay request that's ready to be processed
174    fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
175        // Find the first request that's ready to be retried
176        let mut expired_keys = Vec::new();
177        let mut ready_key = None;
178        
179        for (seq, item) in &self.pending {
180            // Check if request has timed out
181            if now.saturating_duration_since(item.created_at) > self.request_timeout {
182                expired_keys.push(*seq);
183                continue;
184            }
185
186            // Check if it's ready for retry
187            if item.attempts == 0 || 
188               item.last_attempt.map_or(true, |last| 
189                   now.saturating_duration_since(last) >= self.retry_interval) {
190                ready_key = Some(*seq);
191                break;
192            }
193        }
194        
195        // Remove expired items
196        for key in expired_keys {
197            if let Some(expired) = self.pending.shift_remove(&key) {
198                debug!("Relay request for peer {:?} timed out after {:?}", 
199                       expired.target_peer_id, 
200                       now.saturating_duration_since(expired.created_at));
201            }
202        }
203        
204        // Return ready item if found
205        if let Some(key) = ready_key {
206            if let Some(mut item) = self.pending.shift_remove(&key) {
207                item.attempts += 1;
208                item.last_attempt = Some(now);
209                return Some(item);
210            }
211        }
212
213        None
214    }
215
216    /// Requeue a failed relay request if it hasn't exceeded max retries
217    fn requeue_failed(&mut self, item: RelayQueueItem) {
218        if item.attempts < self.max_retries {
219            trace!("Requeuing failed relay request for peer {:?}, attempt {}/{}", 
220                   item.target_peer_id, item.attempts, self.max_retries);
221            let seq = self.next_seq;
222            self.next_seq += 1;
223            self.pending.insert(seq, item);
224        } else {
225            debug!("Dropping relay request for peer {:?} after {} failed attempts", 
226                   item.target_peer_id, item.attempts);
227        }
228    }
229
230    /// Clean up expired requests and return number of items cleaned
231    fn cleanup_expired(&mut self, now: Instant) -> usize {
232        let initial_len = self.pending.len();
233        
234        // Collect expired keys
235        let expired_keys: Vec<u64> = self.pending.iter()
236            .filter_map(|(seq, item)| {
237                if now.saturating_duration_since(item.created_at) > self.request_timeout {
238                    Some(*seq)
239                } else {
240                    None
241                }
242            })
243            .collect();
244        
245        // Remove expired items
246        for key in expired_keys {
247            if let Some(expired) = self.pending.shift_remove(&key) {
248                debug!("Removing expired relay request for peer {:?}", expired.target_peer_id);
249            }
250        }
251
252        initial_len - self.pending.len()
253    }
254
255    /// Get current queue length
256    fn len(&self) -> usize {
257        self.pending.len()
258    }
259
260}
261
262/// The main entry point to the library
263///
264/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
265/// connection-generated events via `handle` and `handle_event`.
266pub struct Endpoint {
267    rng: StdRng,
268    index: ConnectionIndex,
269    connections: Slab<ConnectionMeta>,
270    local_cid_generator: Box<dyn ConnectionIdGenerator>,
271    config: Arc<EndpointConfig>,
272    server_config: Option<Arc<ServerConfig>>,
273    /// Whether the underlying UDP socket promises not to fragment packets
274    allow_mtud: bool,
275    /// Time at which a stateless reset was most recently sent
276    last_stateless_reset: Option<Instant>,
277    /// Buffered Initial and 0-RTT messages for pending incoming connections
278    incoming_buffers: Slab<IncomingBuffer>,
279    all_incoming_buffers_total_bytes: u64,
280    /// Mapping from peer IDs to connection handles for relay functionality
281    peer_connections: HashMap<PeerId, ConnectionHandle>,
282    /// Relay queue for bootstrap nodes
283    relay_queue: RelayQueue,
284    /// Relay statistics
285    relay_stats: RelayStats,
286}
287
288impl Endpoint {
289    /// Create a new endpoint
290    ///
291    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
292    /// better performance. This requires that outgoing packets are never fragmented, which can be
293    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
294    ///
295    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
296    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
297    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
298    /// using [`EndpointConfig::rng_seed`].
299    pub fn new(
300        config: Arc<EndpointConfig>,
301        server_config: Option<Arc<ServerConfig>>,
302        allow_mtud: bool,
303        rng_seed: Option<[u8; 32]>,
304    ) -> Self {
305        let rng_seed = rng_seed.or(config.rng_seed);
306        Self {
307            rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
308            index: ConnectionIndex::default(),
309            connections: Slab::new(),
310            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
311            config,
312            server_config,
313            allow_mtud,
314            last_stateless_reset: None,
315            incoming_buffers: Slab::new(),
316            all_incoming_buffers_total_bytes: 0,
317            peer_connections: HashMap::new(),
318            relay_queue: RelayQueue::new(),
319            relay_stats: RelayStats::default(),
320        }
321    }
322
323    /// Replace the server configuration, affecting new incoming connections only
324    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
325        self.server_config = server_config;
326    }
327
328    /// Register a peer ID with a connection handle for relay functionality
329    pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
330        self.peer_connections.insert(peer_id, connection_handle);
331        trace!("Registered peer {:?} with connection {:?}", peer_id, connection_handle);
332    }
333
334    /// Unregister a peer ID from the connection mapping
335    pub fn unregister_peer(&mut self, peer_id: &PeerId) {
336        if let Some(handle) = self.peer_connections.remove(peer_id) {
337            trace!("Unregistered peer {:?} from connection {:?}", peer_id, handle);
338        }
339    }
340
341    /// Look up a connection handle for a given peer ID
342    pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
343        self.peer_connections.get(peer_id).copied()
344    }
345
346    /// Queue a frame for relay to a target peer
347    pub(crate) fn queue_frame_for_peer(&mut self, peer_id: &PeerId, frame: frame::PunchMeNow) -> bool {
348        self.relay_stats.requests_received += 1;
349        
350        if let Some(ch) = self.lookup_peer_connection(peer_id) {
351            // Peer is currently connected, try to relay immediately
352            if self.relay_frame_to_connection(ch, frame.clone()) {
353                self.relay_stats.requests_relayed += 1;
354                trace!("Immediately relayed frame to peer {:?} via connection {:?}", peer_id, ch);
355                return true;
356            }
357        }
358        
359        // Peer not connected or immediate relay failed, queue for later
360        let now = Instant::now();
361        if self.relay_queue.enqueue(*peer_id, frame, now) {
362            self.relay_stats.current_queue_size = self.relay_queue.len();
363            trace!("Queued relay request for peer {:?}", peer_id);
364            true
365        } else {
366            // Check if it was rate limited or queue full
367            if !self.relay_queue.check_rate_limit(*peer_id, now) {
368                self.relay_stats.requests_rate_limited += 1;
369            } else {
370                self.relay_stats.requests_dropped += 1;
371            }
372            false
373        }
374    }
375
376    /// Attempt to relay a frame to a specific connection
377    fn relay_frame_to_connection(&mut self, ch: ConnectionHandle, _frame: frame::PunchMeNow) -> bool {
378        // In a complete implementation, this would queue the frame in the connection's
379        // pending frames. For now, we'll just return true to indicate success.
380        // The actual frame queuing would need to be implemented at the connection level.
381        
382        // TODO: Implement actual frame queuing to connection's pending frames
383        trace!("Would relay frame to connection {:?}", ch);
384        true
385    }
386
387    /// Set the peer ID for an existing connection
388    pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
389        if let Some(connection) = self.connections.get_mut(connection_handle.0) {
390            connection.peer_id = Some(peer_id);
391            self.register_peer(peer_id, connection_handle);
392            
393            // Process any queued relay requests for this peer
394            self.process_queued_relays_for_peer(peer_id);
395        }
396    }
397
398    /// Process queued relay requests for a specific peer that just connected
399    fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
400        let _now = Instant::now();
401        let mut processed = 0;
402        
403        // Collect items to process for this peer
404        let mut items_to_process = Vec::new();
405        let mut keys_to_remove = Vec::new();
406        
407        // Find all items for this peer
408        for (seq, item) in &self.relay_queue.pending {
409            if item.target_peer_id == peer_id {
410                items_to_process.push(item.clone());
411                keys_to_remove.push(*seq);
412            }
413        }
414        
415        // Remove items from queue
416        for key in keys_to_remove {
417            self.relay_queue.pending.shift_remove(&key);
418        }
419        
420        // Process the items
421        for item in items_to_process {
422            if let Some(ch) = self.lookup_peer_connection(&peer_id) {
423                if self.relay_frame_to_connection(ch, item.frame.clone()) {
424                    self.relay_stats.requests_relayed += 1;
425                    processed += 1;
426                    trace!("Processed queued relay for peer {:?}", peer_id);
427                } else {
428                    // Failed to relay, requeue
429                    self.relay_queue.requeue_failed(item);
430                    self.relay_stats.requests_failed += 1;
431                }
432            }
433        }
434        
435        self.relay_stats.current_queue_size = self.relay_queue.len();
436        
437        if processed > 0 {
438            debug!("Processed {} queued relay requests for peer {:?}", processed, peer_id);
439        }
440    }
441
442    /// Process pending relay requests (should be called periodically)
443    pub fn process_relay_queue(&mut self) {
444        let now = Instant::now();
445        let mut processed = 0;
446        let mut failed = 0;
447        
448        // Process ready relay requests
449        while let Some(item) = self.relay_queue.next_ready(now) {
450            if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
451                if self.relay_frame_to_connection(ch, item.frame.clone()) {
452                    self.relay_stats.requests_relayed += 1;
453                    processed += 1;
454                    trace!("Successfully relayed frame to peer {:?}", item.target_peer_id);
455                } else {
456                    // Failed to relay, requeue for retry
457                    self.relay_queue.requeue_failed(item);
458                    self.relay_stats.requests_failed += 1;
459                    failed += 1;
460                }
461            } else {
462                // Peer not connected, requeue for later
463                self.relay_queue.requeue_failed(item);
464                failed += 1;
465            }
466        }
467        
468        // Clean up expired requests
469        let expired = self.relay_queue.cleanup_expired(now);
470        if expired > 0 {
471            self.relay_stats.requests_timed_out += expired as u64;
472            debug!("Cleaned up {} expired relay requests", expired);
473        }
474        
475        self.relay_stats.current_queue_size = self.relay_queue.len();
476        
477        if processed > 0 || failed > 0 {
478            trace!("Relay queue processing: {} processed, {} failed, {} in queue", 
479                   processed, failed, self.relay_queue.len());
480        }
481    }
482
483    /// Get relay statistics for monitoring
484    pub fn relay_stats(&self) -> &RelayStats {
485        &self.relay_stats
486    }
487
488    /// Get relay queue length
489    pub fn relay_queue_len(&self) -> usize {
490        self.relay_queue.len()
491    }
492
493    /// Process `EndpointEvent`s emitted from related `Connection`s
494    ///
495    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
496    pub fn handle_event(
497        &mut self,
498        ch: ConnectionHandle,
499        event: EndpointEvent,
500    ) -> Option<ConnectionEvent> {
501        use EndpointEventInner::*;
502        match event.0 {
503            EndpointEventInner::NeedIdentifiers(now, n) => {
504                return Some(self.send_new_identifiers(now, ch, n));
505            }
506            ResetToken(remote, token) => {
507                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
508                    self.index.connection_reset_tokens.remove(old.0, old.1);
509                }
510                if self.index.connection_reset_tokens.insert(remote, token, ch) {
511                    warn!("duplicate reset token");
512                }
513            }
514            RetireConnectionId(now, seq, allow_more_cids) => {
515                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
516                    trace!("peer retired CID {}: {}", seq, cid);
517                    self.index.retire(cid);
518                    if allow_more_cids {
519                        return Some(self.send_new_identifiers(now, ch, 1));
520                    }
521                }
522            }
523            RelayPunchMeNow(target_peer_id, punch_me_now) => {
524                // Handle relay request from bootstrap node
525                let peer_id = PeerId(target_peer_id);
526                if self.queue_frame_for_peer(&peer_id, punch_me_now) {
527                    trace!("Successfully queued PunchMeNow frame for relay to peer {:?}", peer_id);
528                } else {
529                    warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
530                }
531            }
532            SendAddressFrame(add_address_frame) => {
533                // Handle bootstrap node request to send ADD_ADDRESS frame
534                trace!("Sending ADD_ADDRESS frame: seq={}, addr={}, priority={}", 
535                       add_address_frame.sequence, add_address_frame.address, add_address_frame.priority);
536                
537                // For now, log the frame since the queuing mechanism needs more integration
538                // TODO: Implement proper frame queuing in the connection layer
539                debug!("ADD_ADDRESS frame ready for transmission: {:?}", add_address_frame);
540            }
541            NatCandidateValidated { address, challenge } => {
542                // Handle successful NAT traversal candidate validation
543                trace!("NAT candidate validation succeeded for {} with challenge {:016x}", address, challenge);
544                
545                // The validation success is primarily handled by the connection-level state machine
546                // This event serves as notification to the endpoint for potential coordination
547                // with other components or logging/metrics collection
548                debug!("NAT candidate {} validated successfully", address);
549            }
550            Drained => {
551                if let Some(conn) = self.connections.try_remove(ch.0) {
552                    self.index.remove(&conn);
553                    // Clean up peer connection mapping if this connection has a peer ID
554                    if let Some(peer_id) = conn.peer_id {
555                        self.peer_connections.remove(&peer_id);
556                        trace!("Cleaned up peer connection mapping for {:?}", peer_id);
557                    }
558                } else {
559                    // This indicates a bug in downstream code, which could cause spurious
560                    // connection loss instead of this error if the CID was (re)allocated prior to
561                    // the illegal call.
562                    error!(id = ch.0, "unknown connection drained");
563                }
564            }
565        }
566        None
567    }
568
569    /// Process an incoming UDP datagram
570    pub fn handle(
571        &mut self,
572        now: Instant,
573        remote: SocketAddr,
574        local_ip: Option<IpAddr>,
575        ecn: Option<EcnCodepoint>,
576        data: BytesMut,
577        buf: &mut Vec<u8>,
578    ) -> Option<DatagramEvent> {
579        // Partially decode packet or short-circuit if unable
580        let datagram_len = data.len();
581        let event = match PartialDecode::new(
582            data,
583            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
584            &self.config.supported_versions,
585            self.config.grease_quic_bit,
586        ) {
587            Ok((first_decode, remaining)) => DatagramConnectionEvent {
588                now,
589                remote,
590                ecn,
591                first_decode,
592                remaining,
593            },
594            Err(PacketDecodeError::UnsupportedVersion {
595                src_cid,
596                dst_cid,
597                version,
598            }) => {
599                if self.server_config.is_none() {
600                    debug!("dropping packet with unsupported version");
601                    return None;
602                }
603                trace!("sending version negotiation");
604                // Negotiate versions
605                Header::VersionNegotiate {
606                    random: self.rng.gen::<u8>() | 0x40,
607                    src_cid: dst_cid,
608                    dst_cid: src_cid,
609                }
610                .encode(buf);
611                // Grease with a reserved version
612                buf.write::<u32>(match version {
613                    0x0a1a_2a3a => 0x0a1a_2a4a,
614                    _ => 0x0a1a_2a3a,
615                });
616                for &version in &self.config.supported_versions {
617                    buf.write(version);
618                }
619                return Some(DatagramEvent::Response(Transmit {
620                    destination: remote,
621                    ecn: None,
622                    size: buf.len(),
623                    segment_size: None,
624                    src_ip: local_ip,
625                }));
626            }
627            Err(e) => {
628                trace!("malformed header: {}", e);
629                return None;
630            }
631        };
632
633        let addresses = FourTuple { remote, local_ip };
634        let dst_cid = event.first_decode.dst_cid();
635
636        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
637            // Handle packet on existing connection
638            match route_to {
639                RouteDatagramTo::Incoming(incoming_idx) => {
640                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
641                    let config = &self.server_config.as_ref().unwrap();
642
643                    if incoming_buffer
644                        .total_bytes
645                        .checked_add(datagram_len as u64)
646                        .is_some_and(|n| n <= config.incoming_buffer_size)
647                        && self
648                            .all_incoming_buffers_total_bytes
649                            .checked_add(datagram_len as u64)
650                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
651                    {
652                        incoming_buffer.datagrams.push(event);
653                        incoming_buffer.total_bytes += datagram_len as u64;
654                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
655                    }
656
657                    None
658                }
659                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
660                    ch,
661                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
662                )),
663            }
664        } else if event.first_decode.initial_header().is_some() {
665            // Potentially create a new connection
666
667            self.handle_first_packet(datagram_len, event, addresses, buf)
668        } else if event.first_decode.has_long_header() {
669            debug!(
670                "ignoring non-initial packet for unknown connection {}",
671                dst_cid
672            );
673            None
674        } else if !event.first_decode.is_initial()
675            && self.local_cid_generator.validate(dst_cid).is_err()
676        {
677            // If we got this far, we're receiving a seemingly valid packet for an unknown
678            // connection. Send a stateless reset if possible.
679
680            debug!("dropping packet with invalid CID");
681            None
682        } else if dst_cid.is_empty() {
683            trace!("dropping unrecognized short packet without ID");
684            None
685        } else {
686            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
687                .map(DatagramEvent::Response)
688        }
689    }
690
691    fn stateless_reset(
692        &mut self,
693        now: Instant,
694        inciting_dgram_len: usize,
695        addresses: FourTuple,
696        dst_cid: ConnectionId,
697        buf: &mut Vec<u8>,
698    ) -> Option<Transmit> {
699        if self
700            .last_stateless_reset
701            .is_some_and(|last| last + self.config.min_reset_interval > now)
702        {
703            debug!("ignoring unexpected packet within minimum stateless reset interval");
704            return None;
705        }
706
707        /// Minimum amount of padding for the stateless reset to look like a short-header packet
708        const MIN_PADDING_LEN: usize = 5;
709
710        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
711        // smaller than the inciting packet.
712        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
713            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
714            _ => {
715                debug!(
716                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
717                    inciting_dgram_len
718                );
719                return None;
720            }
721        };
722
723        debug!(
724            "sending stateless reset for {} to {}",
725            dst_cid, addresses.remote
726        );
727        self.last_stateless_reset = Some(now);
728        // Resets with at least this much padding can't possibly be distinguished from real packets
729        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
730        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
731            max_padding_len
732        } else {
733            self.rng
734                .gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
735        };
736        buf.reserve(padding_len + RESET_TOKEN_SIZE);
737        buf.resize(padding_len, 0);
738        self.rng.fill_bytes(&mut buf[0..padding_len]);
739        buf[0] = 0b0100_0000 | (buf[0] >> 2);
740        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
741
742        debug_assert!(buf.len() < inciting_dgram_len);
743
744        Some(Transmit {
745            destination: addresses.remote,
746            ecn: None,
747            size: buf.len(),
748            segment_size: None,
749            src_ip: addresses.local_ip,
750        })
751    }
752
753    /// Initiate a connection
754    pub fn connect(
755        &mut self,
756        now: Instant,
757        config: ClientConfig,
758        remote: SocketAddr,
759        server_name: &str,
760    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
761        if self.cids_exhausted() {
762            return Err(ConnectError::CidsExhausted);
763        }
764        if remote.port() == 0 || remote.ip().is_unspecified() {
765            return Err(ConnectError::InvalidRemoteAddress(remote));
766        }
767        if !self.config.supported_versions.contains(&config.version) {
768            return Err(ConnectError::UnsupportedVersion);
769        }
770
771        let remote_id = (config.initial_dst_cid_provider)();
772        trace!(initial_dcid = %remote_id);
773
774        let ch = ConnectionHandle(self.connections.vacant_key());
775        let loc_cid = self.new_cid(ch);
776        let params = TransportParameters::new(
777            &config.transport,
778            &self.config,
779            self.local_cid_generator.as_ref(),
780            loc_cid,
781            None,
782            &mut self.rng,
783        );
784        let tls = config
785            .crypto
786            .start_session(config.version, server_name, &params)?;
787
788        let conn = self.add_connection(
789            ch,
790            config.version,
791            remote_id,
792            loc_cid,
793            remote_id,
794            FourTuple {
795                remote,
796                local_ip: None,
797            },
798            now,
799            tls,
800            config.transport,
801            SideArgs::Client {
802                token_store: config.token_store,
803                server_name: server_name.into(),
804            },
805        );
806        Ok((ch, conn))
807    }
808
809    fn send_new_identifiers(
810        &mut self,
811        now: Instant,
812        ch: ConnectionHandle,
813        num: u64,
814    ) -> ConnectionEvent {
815        let mut ids = vec![];
816        for _ in 0..num {
817            let id = self.new_cid(ch);
818            let meta = &mut self.connections[ch];
819            let sequence = meta.cids_issued;
820            meta.cids_issued += 1;
821            meta.loc_cids.insert(sequence, id);
822            ids.push(IssuedCid {
823                sequence,
824                id,
825                reset_token: ResetToken::new(&*self.config.reset_key, id),
826            });
827        }
828        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
829    }
830
831    /// Generate a connection ID for `ch`
832    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
833        loop {
834            let cid = self.local_cid_generator.generate_cid();
835            if cid.is_empty() {
836                // Zero-length CID; nothing to track
837                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
838                return cid;
839            }
840            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
841                e.insert(ch);
842                break cid;
843            }
844        }
845    }
846
847    fn handle_first_packet(
848        &mut self,
849        datagram_len: usize,
850        event: DatagramConnectionEvent,
851        addresses: FourTuple,
852        buf: &mut Vec<u8>,
853    ) -> Option<DatagramEvent> {
854        let dst_cid = event.first_decode.dst_cid();
855        let header = event.first_decode.initial_header().unwrap();
856
857        let Some(server_config) = &self.server_config else {
858            debug!("packet for unrecognized connection {}", dst_cid);
859            return self
860                .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
861                .map(DatagramEvent::Response);
862        };
863
864        if datagram_len < MIN_INITIAL_SIZE as usize {
865            debug!("ignoring short initial for connection {}", dst_cid);
866            return None;
867        }
868
869        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
870            Ok(keys) => keys,
871            Err(UnsupportedVersion) => {
872                // This probably indicates that the user set supported_versions incorrectly in
873                // `EndpointConfig`.
874                debug!(
875                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
876                    header.version
877                );
878                return None;
879            }
880        };
881
882        if let Err(reason) = self.early_validate_first_packet(header) {
883            return Some(DatagramEvent::Response(self.initial_close(
884                header.version,
885                addresses,
886                &crypto,
887                &header.src_cid,
888                reason,
889                buf,
890            )));
891        }
892
893        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
894            Ok(packet) => packet,
895            Err(e) => {
896                trace!("unable to decode initial packet: {}", e);
897                return None;
898            }
899        };
900
901        if !packet.reserved_bits_valid() {
902            debug!("dropping connection attempt with invalid reserved bits");
903            return None;
904        }
905
906        let Header::Initial(header) = packet.header else {
907            panic!("non-initial packet in handle_first_packet()");
908        };
909
910        let server_config = self.server_config.as_ref().unwrap().clone();
911
912        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
913            Ok(token) => token,
914            Err(InvalidRetryTokenError) => {
915                debug!("rejecting invalid retry token");
916                return Some(DatagramEvent::Response(self.initial_close(
917                    header.version,
918                    addresses,
919                    &crypto,
920                    &header.src_cid,
921                    TransportError::INVALID_TOKEN(""),
922                    buf,
923                )));
924            }
925        };
926
927        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
928        self.index
929            .insert_initial_incoming(header.dst_cid, incoming_idx);
930
931        Some(DatagramEvent::NewConnection(Incoming {
932            received_at: event.now,
933            addresses,
934            ecn: event.ecn,
935            packet: InitialPacket {
936                header,
937                header_data: packet.header_data,
938                payload: packet.payload,
939            },
940            rest: event.remaining,
941            crypto,
942            token,
943            incoming_idx,
944            improper_drop_warner: IncomingImproperDropWarner,
945        }))
946    }
947
948    /// Attempt to accept this incoming connection (an error may still occur)
949    // AcceptError cannot be made smaller without semver breakage
950    #[allow(clippy::result_large_err)]
951    pub fn accept(
952        &mut self,
953        mut incoming: Incoming,
954        now: Instant,
955        buf: &mut Vec<u8>,
956        server_config: Option<Arc<ServerConfig>>,
957    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
958        let remote_address_validated = incoming.remote_address_validated();
959        incoming.improper_drop_warner.dismiss();
960        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
961        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
962
963        let packet_number = incoming.packet.header.number.expand(0);
964        let InitialHeader {
965            src_cid,
966            dst_cid,
967            version,
968            ..
969        } = incoming.packet.header;
970        let server_config =
971            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
972
973        if server_config
974            .transport
975            .max_idle_timeout
976            .is_some_and(|timeout| {
977                incoming.received_at + Duration::from_millis(timeout.into()) <= now
978            })
979        {
980            debug!("abandoning accept of stale initial");
981            self.index.remove_initial(dst_cid);
982            return Err(AcceptError {
983                cause: ConnectionError::TimedOut,
984                response: None,
985            });
986        }
987
988        if self.cids_exhausted() {
989            debug!("refusing connection");
990            self.index.remove_initial(dst_cid);
991            return Err(AcceptError {
992                cause: ConnectionError::CidsExhausted,
993                response: Some(self.initial_close(
994                    version,
995                    incoming.addresses,
996                    &incoming.crypto,
997                    &src_cid,
998                    TransportError::CONNECTION_REFUSED(""),
999                    buf,
1000                )),
1001            });
1002        }
1003
1004        if incoming
1005            .crypto
1006            .packet
1007            .remote
1008            .decrypt(
1009                packet_number,
1010                &incoming.packet.header_data,
1011                &mut incoming.packet.payload,
1012            )
1013            .is_err()
1014        {
1015            debug!(packet_number, "failed to authenticate initial packet");
1016            self.index.remove_initial(dst_cid);
1017            return Err(AcceptError {
1018                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1019                response: None,
1020            });
1021        };
1022
1023        let ch = ConnectionHandle(self.connections.vacant_key());
1024        let loc_cid = self.new_cid(ch);
1025        let mut params = TransportParameters::new(
1026            &server_config.transport,
1027            &self.config,
1028            self.local_cid_generator.as_ref(),
1029            loc_cid,
1030            Some(&server_config),
1031            &mut self.rng,
1032        );
1033        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1034        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1035        params.retry_src_cid = incoming.token.retry_src_cid;
1036        let mut pref_addr_cid = None;
1037        if server_config.has_preferred_address() {
1038            let cid = self.new_cid(ch);
1039            pref_addr_cid = Some(cid);
1040            params.preferred_address = Some(PreferredAddress {
1041                address_v4: server_config.preferred_address_v4,
1042                address_v6: server_config.preferred_address_v6,
1043                connection_id: cid,
1044                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1045            });
1046        }
1047
1048        let tls = server_config.crypto.clone().start_session(version, &params);
1049        let transport_config = server_config.transport.clone();
1050        let mut conn = self.add_connection(
1051            ch,
1052            version,
1053            dst_cid,
1054            loc_cid,
1055            src_cid,
1056            incoming.addresses,
1057            incoming.received_at,
1058            tls,
1059            transport_config,
1060            SideArgs::Server {
1061                server_config,
1062                pref_addr_cid,
1063                path_validated: remote_address_validated,
1064            },
1065        );
1066        self.index.insert_initial(dst_cid, ch);
1067
1068        match conn.handle_first_packet(
1069            incoming.received_at,
1070            incoming.addresses.remote,
1071            incoming.ecn,
1072            packet_number,
1073            incoming.packet,
1074            incoming.rest,
1075        ) {
1076            Ok(()) => {
1077                trace!(id = ch.0, icid = %dst_cid, "new connection");
1078
1079                for event in incoming_buffer.datagrams {
1080                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1081                }
1082
1083                Ok((ch, conn))
1084            }
1085            Err(e) => {
1086                debug!("handshake failed: {}", e);
1087                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1088                let response = match e {
1089                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
1090                        version,
1091                        incoming.addresses,
1092                        &incoming.crypto,
1093                        &src_cid,
1094                        e.clone(),
1095                        buf,
1096                    )),
1097                    _ => None,
1098                };
1099                Err(AcceptError { cause: e, response })
1100            }
1101        }
1102    }
1103
1104    /// Check if we should refuse a connection attempt regardless of the packet's contents
1105    fn early_validate_first_packet(
1106        &mut self,
1107        header: &ProtectedInitialHeader,
1108    ) -> Result<(), TransportError> {
1109        let config = &self.server_config.as_ref().unwrap();
1110        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1111            return Err(TransportError::CONNECTION_REFUSED(""));
1112        }
1113
1114        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
1115        // bytes. If this is a Retry packet, then the length must instead match our usual CID
1116        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
1117        // also need to validate CID length for those after decoding the token.
1118        if header.dst_cid.len() < 8
1119            && (header.token_pos.is_empty()
1120                || header.dst_cid.len() != self.local_cid_generator.cid_len())
1121        {
1122            debug!(
1123                "rejecting connection due to invalid DCID length {}",
1124                header.dst_cid.len()
1125            );
1126            return Err(TransportError::PROTOCOL_VIOLATION(
1127                "invalid destination CID length",
1128            ));
1129        }
1130
1131        Ok(())
1132    }
1133
1134    /// Reject this incoming connection attempt
1135    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1136        self.clean_up_incoming(&incoming);
1137        incoming.improper_drop_warner.dismiss();
1138
1139        self.initial_close(
1140            incoming.packet.header.version,
1141            incoming.addresses,
1142            &incoming.crypto,
1143            &incoming.packet.header.src_cid,
1144            TransportError::CONNECTION_REFUSED(""),
1145            buf,
1146        )
1147    }
1148
1149    /// Respond with a retry packet, requiring the client to retry with address validation
1150    ///
1151    /// Errors if `incoming.may_retry()` is false.
1152    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1153        if !incoming.may_retry() {
1154            return Err(RetryError(Box::new(incoming)));
1155        }
1156
1157        self.clean_up_incoming(&incoming);
1158        incoming.improper_drop_warner.dismiss();
1159
1160        let server_config = self.server_config.as_ref().unwrap();
1161
1162        // First Initial
1163        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
1164        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
1165        // with established connections. In the unlikely event that a collision occurs
1166        // between two connections in the initial phase, both will fail fast and may be
1167        // retried by the application layer.
1168        let loc_cid = self.local_cid_generator.generate_cid();
1169
1170        let payload = TokenPayload::Retry {
1171            address: incoming.addresses.remote,
1172            orig_dst_cid: incoming.packet.header.dst_cid,
1173            issued: server_config.time_source.now(),
1174        };
1175        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1176
1177        let header = Header::Retry {
1178            src_cid: loc_cid,
1179            dst_cid: incoming.packet.header.src_cid,
1180            version: incoming.packet.header.version,
1181        };
1182
1183        let encode = header.encode(buf);
1184        buf.put_slice(&token);
1185        buf.extend_from_slice(&server_config.crypto.retry_tag(
1186            incoming.packet.header.version,
1187            &incoming.packet.header.dst_cid,
1188            buf,
1189        ));
1190        encode.finish(buf, &*incoming.crypto.header.local, None);
1191
1192        Ok(Transmit {
1193            destination: incoming.addresses.remote,
1194            ecn: None,
1195            size: buf.len(),
1196            segment_size: None,
1197            src_ip: incoming.addresses.local_ip,
1198        })
1199    }
1200
1201    /// Ignore this incoming connection attempt, not sending any packet in response
1202    ///
1203    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
1204    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
1205    pub fn ignore(&mut self, incoming: Incoming) {
1206        self.clean_up_incoming(&incoming);
1207        incoming.improper_drop_warner.dismiss();
1208    }
1209
1210    /// Clean up endpoint data structures associated with an `Incoming`.
1211    fn clean_up_incoming(&mut self, incoming: &Incoming) {
1212        self.index.remove_initial(incoming.packet.header.dst_cid);
1213        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1214        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1215    }
1216
1217    fn add_connection(
1218        &mut self,
1219        ch: ConnectionHandle,
1220        version: u32,
1221        init_cid: ConnectionId,
1222        loc_cid: ConnectionId,
1223        rem_cid: ConnectionId,
1224        addresses: FourTuple,
1225        now: Instant,
1226        tls: Box<dyn crypto::Session>,
1227        transport_config: Arc<TransportConfig>,
1228        side_args: SideArgs,
1229    ) -> Connection {
1230        let mut rng_seed = [0; 32];
1231        self.rng.fill_bytes(&mut rng_seed);
1232        let side = side_args.side();
1233        let pref_addr_cid = side_args.pref_addr_cid();
1234        let conn = Connection::new(
1235            self.config.clone(),
1236            transport_config,
1237            init_cid,
1238            loc_cid,
1239            rem_cid,
1240            addresses.remote,
1241            addresses.local_ip,
1242            tls,
1243            self.local_cid_generator.as_ref(),
1244            now,
1245            version,
1246            self.allow_mtud,
1247            rng_seed,
1248            side_args,
1249        );
1250
1251        let mut cids_issued = 0;
1252        let mut loc_cids = FxHashMap::default();
1253
1254        loc_cids.insert(cids_issued, loc_cid);
1255        cids_issued += 1;
1256
1257        if let Some(cid) = pref_addr_cid {
1258            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1259            loc_cids.insert(cids_issued, cid);
1260            cids_issued += 1;
1261        }
1262
1263        let id = self.connections.insert(ConnectionMeta {
1264            init_cid,
1265            cids_issued,
1266            loc_cids,
1267            addresses,
1268            side,
1269            reset_token: None,
1270            peer_id: None,
1271        });
1272        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1273
1274        self.index.insert_conn(addresses, loc_cid, ch, side);
1275
1276        conn
1277    }
1278
1279    fn initial_close(
1280        &mut self,
1281        version: u32,
1282        addresses: FourTuple,
1283        crypto: &Keys,
1284        remote_id: &ConnectionId,
1285        reason: TransportError,
1286        buf: &mut Vec<u8>,
1287    ) -> Transmit {
1288        // We don't need to worry about CID collisions in initial closes because the peer
1289        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
1290        // unexpected response.
1291        let local_id = self.local_cid_generator.generate_cid();
1292        let number = PacketNumber::U8(0);
1293        let header = Header::Initial(InitialHeader {
1294            dst_cid: *remote_id,
1295            src_cid: local_id,
1296            number,
1297            token: Bytes::new(),
1298            version,
1299        });
1300
1301        let partial_encode = header.encode(buf);
1302        let max_len =
1303            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1304        frame::Close::from(reason).encode(buf, max_len);
1305        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1306        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1307        Transmit {
1308            destination: addresses.remote,
1309            ecn: None,
1310            size: buf.len(),
1311            segment_size: None,
1312            src_ip: addresses.local_ip,
1313        }
1314    }
1315
1316    /// Access the configuration used by this endpoint
1317    pub fn config(&self) -> &EndpointConfig {
1318        &self.config
1319    }
1320
1321    /// Number of connections that are currently open
1322    pub fn open_connections(&self) -> usize {
1323        self.connections.len()
1324    }
1325
1326    /// Counter for the number of bytes currently used
1327    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
1328    pub fn incoming_buffer_bytes(&self) -> u64 {
1329        self.all_incoming_buffers_total_bytes
1330    }
1331
1332    #[cfg(test)]
1333    pub(crate) fn known_connections(&self) -> usize {
1334        let x = self.connections.len();
1335        debug_assert_eq!(x, self.index.connection_ids_initial.len());
1336        // Not all connections have known reset tokens
1337        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1338        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
1339        debug_assert!(x >= self.index.incoming_connection_remotes.len());
1340        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1341        x
1342    }
1343
1344    #[cfg(test)]
1345    pub(crate) fn known_cids(&self) -> usize {
1346        self.index.connection_ids.len()
1347    }
1348
1349    /// Whether we've used up 3/4 of the available CID space
1350    ///
1351    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
1352    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
1353    fn cids_exhausted(&self) -> bool {
1354        self.local_cid_generator.cid_len() <= 4
1355            && self.local_cid_generator.cid_len() != 0
1356            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1357                - self.index.connection_ids.len())
1358                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1359    }
1360}
1361
1362impl fmt::Debug for Endpoint {
1363    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1364        fmt.debug_struct("Endpoint")
1365            .field("rng", &self.rng)
1366            .field("index", &self.index)
1367            .field("connections", &self.connections)
1368            .field("config", &self.config)
1369            .field("server_config", &self.server_config)
1370            // incoming_buffers too large
1371            .field("incoming_buffers.len", &self.incoming_buffers.len())
1372            .field(
1373                "all_incoming_buffers_total_bytes",
1374                &self.all_incoming_buffers_total_bytes,
1375            )
1376            .finish()
1377    }
1378}
1379
1380/// Buffered Initial and 0-RTT messages for a pending incoming connection
1381#[derive(Default)]
1382struct IncomingBuffer {
1383    datagrams: Vec<DatagramConnectionEvent>,
1384    total_bytes: u64,
1385}
1386
1387/// Part of protocol state incoming datagrams can be routed to
1388#[derive(Copy, Clone, Debug)]
1389enum RouteDatagramTo {
1390    Incoming(usize),
1391    Connection(ConnectionHandle),
1392}
1393
1394/// Maps packets to existing connections
1395#[derive(Default, Debug)]
1396struct ConnectionIndex {
1397    /// Identifies connections based on the initial DCID the peer utilized
1398    ///
1399    /// Uses a standard `HashMap` to protect against hash collision attacks.
1400    ///
1401    /// Used by the server, not the client.
1402    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1403    /// Identifies connections based on locally created CIDs
1404    ///
1405    /// Uses a cheaper hash function since keys are locally created
1406    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1407    /// Identifies incoming connections with zero-length CIDs
1408    ///
1409    /// Uses a standard `HashMap` to protect against hash collision attacks.
1410    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1411    /// Identifies outgoing connections with zero-length CIDs
1412    ///
1413    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1414    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1415    /// may be established per remote. We must omit the local address from the key because we don't
1416    /// necessarily know what address we're sending from, and hence receiving at.
1417    ///
1418    /// Uses a standard `HashMap` to protect against hash collision attacks.
1419    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1420    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1421    ///
1422    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1423    /// recipient, if any.
1424    connection_reset_tokens: ResetTokenTable,
1425}
1426
1427impl ConnectionIndex {
1428    /// Associate an incoming connection with its initial destination CID
1429    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1430        if dst_cid.is_empty() {
1431            return;
1432        }
1433        self.connection_ids_initial
1434            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1435    }
1436
1437    /// Remove an association with an initial destination CID
1438    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1439        if dst_cid.is_empty() {
1440            return;
1441        }
1442        let removed = self.connection_ids_initial.remove(&dst_cid);
1443        debug_assert!(removed.is_some());
1444    }
1445
1446    /// Associate a connection with its initial destination CID
1447    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1448        if dst_cid.is_empty() {
1449            return;
1450        }
1451        self.connection_ids_initial
1452            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1453    }
1454
1455    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1456    /// its current 4-tuple
1457    fn insert_conn(
1458        &mut self,
1459        addresses: FourTuple,
1460        dst_cid: ConnectionId,
1461        connection: ConnectionHandle,
1462        side: Side,
1463    ) {
1464        match dst_cid.len() {
1465            0 => match side {
1466                Side::Server => {
1467                    self.incoming_connection_remotes
1468                        .insert(addresses, connection);
1469                }
1470                Side::Client => {
1471                    self.outgoing_connection_remotes
1472                        .insert(addresses.remote, connection);
1473                }
1474            },
1475            _ => {
1476                self.connection_ids.insert(dst_cid, connection);
1477            }
1478        }
1479    }
1480
1481    /// Discard a connection ID
1482    fn retire(&mut self, dst_cid: ConnectionId) {
1483        self.connection_ids.remove(&dst_cid);
1484    }
1485
1486    /// Remove all references to a connection
1487    fn remove(&mut self, conn: &ConnectionMeta) {
1488        if conn.side.is_server() {
1489            self.remove_initial(conn.init_cid);
1490        }
1491        for cid in conn.loc_cids.values() {
1492            self.connection_ids.remove(cid);
1493        }
1494        self.incoming_connection_remotes.remove(&conn.addresses);
1495        self.outgoing_connection_remotes
1496            .remove(&conn.addresses.remote);
1497        if let Some((remote, token)) = conn.reset_token {
1498            self.connection_reset_tokens.remove(remote, token);
1499        }
1500    }
1501
1502    /// Find the existing connection that `datagram` should be routed to, if any
1503    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1504        let dst_cid = datagram.dst_cid();
1505        let is_empty_cid = dst_cid.is_empty();
1506        
1507        // Fast path: Try most common lookup first (non-empty CID)
1508        if !is_empty_cid {
1509            if let Some(&ch) = self.connection_ids.get(dst_cid) {
1510                return Some(RouteDatagramTo::Connection(ch));
1511            }
1512        }
1513        
1514        // Initial/0RTT packet lookup
1515        if datagram.is_initial() || datagram.is_0rtt() {
1516            if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1517                return Some(ch);
1518            }
1519        }
1520        
1521        // Empty CID lookup (less common, do after fast path)
1522        if is_empty_cid {
1523            // Check incoming connections first (servers handle more incoming)
1524            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1525                return Some(RouteDatagramTo::Connection(ch));
1526            }
1527            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1528                return Some(RouteDatagramTo::Connection(ch));
1529            }
1530        }
1531        
1532        // Stateless reset token lookup (least common, do last)
1533        let data = datagram.data();
1534        if data.len() < RESET_TOKEN_SIZE {
1535            return None;
1536        }
1537        self.connection_reset_tokens
1538            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1539            .cloned()
1540            .map(RouteDatagramTo::Connection)
1541    }
1542}
1543
1544#[derive(Debug)]
1545pub(crate) struct ConnectionMeta {
1546    init_cid: ConnectionId,
1547    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1548    cids_issued: u64,
1549    loc_cids: FxHashMap<u64, ConnectionId>,
1550    /// Remote/local addresses the connection began with
1551    ///
1552    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1553    /// bother keeping it up to date.
1554    addresses: FourTuple,
1555    side: Side,
1556    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1557    /// being sent to
1558    reset_token: Option<(SocketAddr, ResetToken)>,
1559    /// Peer ID for this connection, used for relay functionality
1560    peer_id: Option<PeerId>,
1561}
1562
1563/// Internal identifier for a `Connection` currently associated with an endpoint
1564#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1565pub struct ConnectionHandle(pub usize);
1566
1567impl From<ConnectionHandle> for usize {
1568    fn from(x: ConnectionHandle) -> Self {
1569        x.0
1570    }
1571}
1572
1573impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1574    type Output = ConnectionMeta;
1575    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1576        &self[ch.0]
1577    }
1578}
1579
1580impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1581    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1582        &mut self[ch.0]
1583    }
1584}
1585
1586/// Event resulting from processing a single datagram
1587pub enum DatagramEvent {
1588    /// The datagram is redirected to its `Connection`
1589    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1590    /// The datagram may result in starting a new `Connection`
1591    NewConnection(Incoming),
1592    /// Response generated directly by the endpoint
1593    Response(Transmit),
1594}
1595
1596/// An incoming connection for which the server has not yet begun its part of the handshake.
1597pub struct Incoming {
1598    received_at: Instant,
1599    addresses: FourTuple,
1600    ecn: Option<EcnCodepoint>,
1601    packet: InitialPacket,
1602    rest: Option<BytesMut>,
1603    crypto: Keys,
1604    token: IncomingToken,
1605    incoming_idx: usize,
1606    improper_drop_warner: IncomingImproperDropWarner,
1607}
1608
1609impl Incoming {
1610    /// The local IP address which was used when the peer established the connection
1611    ///
1612    /// This has the same behavior as [`Connection::local_ip`].
1613    pub fn local_ip(&self) -> Option<IpAddr> {
1614        self.addresses.local_ip
1615    }
1616
1617    /// The peer's UDP address
1618    pub fn remote_address(&self) -> SocketAddr {
1619        self.addresses.remote
1620    }
1621
1622    /// Whether the socket address that is initiating this connection has been validated
1623    ///
1624    /// This means that the sender of the initial packet has proved that they can receive traffic
1625    /// sent to `self.remote_address()`.
1626    ///
1627    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1628    /// The inverse is not guaranteed.
1629    pub fn remote_address_validated(&self) -> bool {
1630        self.token.validated
1631    }
1632
1633    /// Whether it is legal to respond with a retry packet
1634    ///
1635    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1636    /// The inverse is not guaranteed.
1637    pub fn may_retry(&self) -> bool {
1638        self.token.retry_src_cid.is_none()
1639    }
1640
1641    /// The original destination connection ID sent by the client
1642    pub fn orig_dst_cid(&self) -> &ConnectionId {
1643        &self.token.orig_dst_cid
1644    }
1645}
1646
1647impl fmt::Debug for Incoming {
1648    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1649        f.debug_struct("Incoming")
1650            .field("addresses", &self.addresses)
1651            .field("ecn", &self.ecn)
1652            // packet doesn't implement debug
1653            // rest is too big and not meaningful enough
1654            .field("token", &self.token)
1655            .field("incoming_idx", &self.incoming_idx)
1656            // improper drop warner contains no information
1657            .finish_non_exhaustive()
1658    }
1659}
1660
1661struct IncomingImproperDropWarner;
1662
1663impl IncomingImproperDropWarner {
1664    fn dismiss(self) {
1665        mem::forget(self);
1666    }
1667}
1668
1669impl Drop for IncomingImproperDropWarner {
1670    fn drop(&mut self) {
1671        warn!(
1672            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1673               (may cause memory leak and eventual inability to accept new connections)"
1674        );
1675    }
1676}
1677
1678/// Errors in the parameters being used to create a new connection
1679///
1680/// These arise before any I/O has been performed.
1681#[derive(Debug, Error, Clone, PartialEq, Eq)]
1682pub enum ConnectError {
1683    /// The endpoint can no longer create new connections
1684    ///
1685    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1686    #[error("endpoint stopping")]
1687    EndpointStopping,
1688    /// The connection could not be created because not enough of the CID space is available
1689    ///
1690    /// Try using longer connection IDs
1691    #[error("CIDs exhausted")]
1692    CidsExhausted,
1693    /// The given server name was malformed
1694    #[error("invalid server name: {0}")]
1695    InvalidServerName(String),
1696    /// The remote [`SocketAddr`] supplied was malformed
1697    ///
1698    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1699    #[error("invalid remote address: {0}")]
1700    InvalidRemoteAddress(SocketAddr),
1701    /// No default client configuration was set up
1702    ///
1703    /// Use `Endpoint::connect_with` to specify a client configuration.
1704    #[error("no default client config")]
1705    NoDefaultClientConfig,
1706    /// The local endpoint does not support the QUIC version specified in the client configuration
1707    #[error("unsupported QUIC version")]
1708    UnsupportedVersion,
1709}
1710
1711/// Error type for attempting to accept an [`Incoming`]
1712#[derive(Debug)]
1713pub struct AcceptError {
1714    /// Underlying error describing reason for failure
1715    pub cause: ConnectionError,
1716    /// Optional response to transmit back
1717    pub response: Option<Transmit>,
1718}
1719
1720/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1721#[derive(Debug, Error)]
1722#[error("retry() with validated Incoming")]
1723pub struct RetryError(Box<Incoming>);
1724
1725impl RetryError {
1726    /// Get the [`Incoming`]
1727    pub fn into_incoming(self) -> Incoming {
1728        *self.0
1729    }
1730}
1731
1732/// Reset Tokens which are associated with peer socket addresses
1733///
1734/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1735/// peer generated and might be usable for hash collision attacks.
1736#[derive(Default, Debug)]
1737struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1738
1739impl ResetTokenTable {
1740    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1741        self.0
1742            .entry(remote)
1743            .or_default()
1744            .insert(token, ch)
1745            .is_some()
1746    }
1747
1748    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1749        use std::collections::hash_map::Entry;
1750        match self.0.entry(remote) {
1751            Entry::Vacant(_) => {}
1752            Entry::Occupied(mut e) => {
1753                e.get_mut().remove(&token);
1754                if e.get().is_empty() {
1755                    e.remove_entry();
1756                }
1757            }
1758        }
1759    }
1760
1761    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1762        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1763        self.0.get(&remote)?.get(&token)
1764    }
1765}
1766
1767/// Identifies a connection by the combination of remote and local addresses
1768///
1769/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1770/// subnet and zero-length connection IDs are in use.
1771#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1772struct FourTuple {
1773    remote: SocketAddr,
1774    // A single socket can only listen on a single port, so no need to store it explicitly
1775    local_ip: Option<IpAddr>,
1776}