quinn_proto/
endpoint.rs

1use std::{
2    collections::{HashMap, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
19    Side, Transmit, TransportConfig, TransportError,
20    cid_generator::ConnectionIdGenerator,
21    coding::BufMutExt,
22    config::{ClientConfig, EndpointConfig, ServerConfig},
23    connection::{Connection, ConnectionError, SideArgs},
24    crypto::{self, Keys, UnsupportedVersion},
25    frame,
26    packet::{
27        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28        PacketNumber, PartialDecode, ProtectedInitialHeader,
29    },
30    shared::{
31        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32        EndpointEvent, EndpointEventInner, IssuedCid,
33    },
34    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35    transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38/// The main entry point to the library
39///
40/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
41/// connection-generated events via `handle` and `handle_event`.
42pub struct Endpoint {
43    rng: StdRng,
44    index: ConnectionIndex,
45    connections: Slab<ConnectionMeta>,
46    local_cid_generator: Box<dyn ConnectionIdGenerator>,
47    config: Arc<EndpointConfig>,
48    server_config: Option<Arc<ServerConfig>>,
49    /// Whether the underlying UDP socket promises not to fragment packets
50    allow_mtud: bool,
51    /// Time at which a stateless reset was most recently sent
52    last_stateless_reset: Option<Instant>,
53    /// Buffered Initial and 0-RTT messages for pending incoming connections
54    incoming_buffers: Slab<IncomingBuffer>,
55    all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59    /// Create a new endpoint
60    ///
61    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
62    /// better performance. This requires that outgoing packets are never fragmented, which can be
63    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
64    ///
65    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
66    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
67    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
68    /// using [`EndpointConfig::rng_seed`].
69    pub fn new(
70        config: Arc<EndpointConfig>,
71        server_config: Option<Arc<ServerConfig>>,
72        allow_mtud: bool,
73        rng_seed: Option<[u8; 32]>,
74    ) -> Self {
75        let rng_seed = rng_seed.or(config.rng_seed);
76        Self {
77            rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78            index: ConnectionIndex::default(),
79            connections: Slab::new(),
80            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81            config,
82            server_config,
83            allow_mtud,
84            last_stateless_reset: None,
85            incoming_buffers: Slab::new(),
86            all_incoming_buffers_total_bytes: 0,
87        }
88    }
89
90    /// Replace the server configuration, affecting new incoming connections only
91    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92        self.server_config = server_config;
93    }
94
95    /// Process `EndpointEvent`s emitted from related `Connection`s
96    ///
97    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
98    pub fn handle_event(
99        &mut self,
100        ch: ConnectionHandle,
101        event: EndpointEvent,
102    ) -> Option<ConnectionEvent> {
103        use EndpointEventInner::*;
104        match event.0 {
105            NeedIdentifiers(now, n) => {
106                return Some(self.send_new_identifiers(now, ch, n));
107            }
108            ResetToken(remote, token) => {
109                if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
110                    self.index.connection_reset_tokens.remove(old.0, old.1);
111                }
112                if self.index.connection_reset_tokens.insert(remote, token, ch) {
113                    warn!("duplicate reset token");
114                }
115            }
116            RetireConnectionId(now, seq, allow_more_cids) => {
117                if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
118                    trace!("peer retired CID {}: {}", seq, cid);
119                    self.index.retire(cid);
120                    if allow_more_cids {
121                        return Some(self.send_new_identifiers(now, ch, 1));
122                    }
123                }
124            }
125            Drained => {
126                if let Some(conn) = self.connections.try_remove(ch.0) {
127                    self.index.remove(&conn);
128                } else {
129                    // This indicates a bug in downstream code, which could cause spurious
130                    // connection loss instead of this error if the CID was (re)allocated prior to
131                    // the illegal call.
132                    error!(id = ch.0, "unknown connection drained");
133                }
134            }
135        }
136        None
137    }
138
139    /// Process an incoming UDP datagram
140    pub fn handle(
141        &mut self,
142        now: Instant,
143        remote: SocketAddr,
144        local_ip: Option<IpAddr>,
145        ecn: Option<EcnCodepoint>,
146        data: BytesMut,
147        buf: &mut Vec<u8>,
148    ) -> Option<DatagramEvent> {
149        // Partially decode packet or short-circuit if unable
150        let datagram_len = data.len();
151        let event = match PartialDecode::new(
152            data,
153            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
154            &self.config.supported_versions,
155            self.config.grease_quic_bit,
156        ) {
157            Ok((first_decode, remaining)) => DatagramConnectionEvent {
158                now,
159                remote,
160                ecn,
161                first_decode,
162                remaining,
163            },
164            Err(PacketDecodeError::UnsupportedVersion {
165                src_cid,
166                dst_cid,
167                version,
168            }) => {
169                if self.server_config.is_none() {
170                    debug!("dropping packet with unsupported version");
171                    return None;
172                }
173                trace!("sending version negotiation");
174                // Negotiate versions
175                Header::VersionNegotiate {
176                    random: self.rng.random::<u8>() | 0x40,
177                    src_cid: dst_cid,
178                    dst_cid: src_cid,
179                }
180                .encode(buf);
181                // Grease with a reserved version
182                buf.write::<u32>(match version {
183                    0x0a1a_2a3a => 0x0a1a_2a4a,
184                    _ => 0x0a1a_2a3a,
185                });
186                for &version in &self.config.supported_versions {
187                    buf.write(version);
188                }
189                return Some(DatagramEvent::Response(Transmit {
190                    destination: remote,
191                    ecn: None,
192                    size: buf.len(),
193                    segment_size: None,
194                    src_ip: local_ip,
195                }));
196            }
197            Err(e) => {
198                trace!("malformed header: {}", e);
199                return None;
200            }
201        };
202
203        let addresses = FourTuple { remote, local_ip };
204        let dst_cid = event.first_decode.dst_cid();
205
206        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
207            // Handle packet on existing connection
208            match route_to {
209                RouteDatagramTo::Incoming(incoming_idx) => {
210                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
211                    let config = &self.server_config.as_ref().unwrap();
212
213                    if incoming_buffer
214                        .total_bytes
215                        .checked_add(datagram_len as u64)
216                        .is_some_and(|n| n <= config.incoming_buffer_size)
217                        && self
218                            .all_incoming_buffers_total_bytes
219                            .checked_add(datagram_len as u64)
220                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
221                    {
222                        incoming_buffer.datagrams.push(event);
223                        incoming_buffer.total_bytes += datagram_len as u64;
224                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
225                    }
226
227                    None
228                }
229                RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
230                    ch,
231                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
232                )),
233            }
234        } else if event.first_decode.initial_header().is_some() {
235            // Potentially create a new connection
236
237            self.handle_first_packet(datagram_len, event, addresses, buf)
238        } else if event.first_decode.has_long_header() {
239            debug!(
240                "ignoring non-initial packet for unknown connection {}",
241                dst_cid
242            );
243            None
244        } else if !event.first_decode.is_initial()
245            && self.local_cid_generator.validate(dst_cid).is_err()
246        {
247            debug!("dropping packet with invalid CID");
248            None
249        } else if dst_cid.is_empty() {
250            trace!("dropping unrecognized short packet without ID");
251            None
252        } else {
253            // If we got this far, we're receiving a seemingly valid packet for an unknown
254            // connection. Send a stateless reset if possible.
255            self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
256                .map(DatagramEvent::Response)
257        }
258    }
259
260    fn stateless_reset(
261        &mut self,
262        now: Instant,
263        inciting_dgram_len: usize,
264        addresses: FourTuple,
265        dst_cid: ConnectionId,
266        buf: &mut Vec<u8>,
267    ) -> Option<Transmit> {
268        if self
269            .last_stateless_reset
270            .is_some_and(|last| last + self.config.min_reset_interval > now)
271        {
272            debug!("ignoring unexpected packet within minimum stateless reset interval");
273            return None;
274        }
275
276        /// Minimum amount of padding for the stateless reset to look like a short-header packet
277        const MIN_PADDING_LEN: usize = 5;
278
279        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
280        // smaller than the inciting packet.
281        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
282            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
283            _ => {
284                debug!(
285                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
286                    inciting_dgram_len
287                );
288                return None;
289            }
290        };
291
292        debug!(
293            "sending stateless reset for {} to {}",
294            dst_cid, addresses.remote
295        );
296        self.last_stateless_reset = Some(now);
297        // Resets with at least this much padding can't possibly be distinguished from real packets
298        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
299        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
300            max_padding_len
301        } else {
302            self.rng
303                .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
304        };
305        buf.reserve(padding_len + RESET_TOKEN_SIZE);
306        buf.resize(padding_len, 0);
307        self.rng.fill_bytes(&mut buf[0..padding_len]);
308        buf[0] = 0b0100_0000 | (buf[0] >> 2);
309        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
310
311        debug_assert!(buf.len() < inciting_dgram_len);
312
313        Some(Transmit {
314            destination: addresses.remote,
315            ecn: None,
316            size: buf.len(),
317            segment_size: None,
318            src_ip: addresses.local_ip,
319        })
320    }
321
322    /// Initiate a connection
323    pub fn connect(
324        &mut self,
325        now: Instant,
326        config: ClientConfig,
327        remote: SocketAddr,
328        server_name: &str,
329    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
330        if self.cids_exhausted() {
331            return Err(ConnectError::CidsExhausted);
332        }
333        if remote.port() == 0 || remote.ip().is_unspecified() {
334            return Err(ConnectError::InvalidRemoteAddress(remote));
335        }
336        if !self.config.supported_versions.contains(&config.version) {
337            return Err(ConnectError::UnsupportedVersion);
338        }
339
340        let remote_id = (config.initial_dst_cid_provider)();
341        trace!(initial_dcid = %remote_id);
342
343        let ch = ConnectionHandle(self.connections.vacant_key());
344        let loc_cid = self.new_cid(ch);
345        let params = TransportParameters::new(
346            &config.transport,
347            &self.config,
348            self.local_cid_generator.as_ref(),
349            loc_cid,
350            None,
351            &mut self.rng,
352        );
353        let tls = config
354            .crypto
355            .start_session(config.version, server_name, &params)?;
356
357        let conn = self.add_connection(
358            ch,
359            config.version,
360            remote_id,
361            loc_cid,
362            remote_id,
363            FourTuple {
364                remote,
365                local_ip: None,
366            },
367            now,
368            tls,
369            config.transport,
370            SideArgs::Client {
371                token_store: config.token_store,
372                server_name: server_name.into(),
373            },
374        );
375        Ok((ch, conn))
376    }
377
378    fn send_new_identifiers(
379        &mut self,
380        now: Instant,
381        ch: ConnectionHandle,
382        num: u64,
383    ) -> ConnectionEvent {
384        let mut ids = vec![];
385        for _ in 0..num {
386            let id = self.new_cid(ch);
387            let meta = &mut self.connections[ch];
388            let sequence = meta.cids_issued;
389            meta.cids_issued += 1;
390            meta.loc_cids.insert(sequence, id);
391            ids.push(IssuedCid {
392                sequence,
393                id,
394                reset_token: ResetToken::new(&*self.config.reset_key, id),
395            });
396        }
397        ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
398    }
399
400    /// Generate a connection ID for `ch`
401    fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
402        loop {
403            let cid = self.local_cid_generator.generate_cid();
404            if cid.is_empty() {
405                // Zero-length CID; nothing to track
406                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
407                return cid;
408            }
409            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
410                e.insert(ch);
411                break cid;
412            }
413        }
414    }
415
416    fn handle_first_packet(
417        &mut self,
418        datagram_len: usize,
419        event: DatagramConnectionEvent,
420        addresses: FourTuple,
421        buf: &mut Vec<u8>,
422    ) -> Option<DatagramEvent> {
423        let dst_cid = event.first_decode.dst_cid();
424        let header = event.first_decode.initial_header().unwrap();
425
426        let Some(server_config) = &self.server_config else {
427            debug!("packet for unrecognized connection {}", dst_cid);
428            return self
429                .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
430                .map(DatagramEvent::Response);
431        };
432
433        if datagram_len < MIN_INITIAL_SIZE as usize {
434            debug!("ignoring short initial for connection {}", dst_cid);
435            return None;
436        }
437
438        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
439            Ok(keys) => keys,
440            Err(UnsupportedVersion) => {
441                // This probably indicates that the user set supported_versions incorrectly in
442                // `EndpointConfig`.
443                debug!(
444                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
445                    header.version
446                );
447                return None;
448            }
449        };
450
451        if let Err(reason) = self.early_validate_first_packet(header) {
452            return Some(DatagramEvent::Response(self.initial_close(
453                header.version,
454                addresses,
455                &crypto,
456                &header.src_cid,
457                reason,
458                buf,
459            )));
460        }
461
462        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
463            Ok(packet) => packet,
464            Err(e) => {
465                trace!("unable to decode initial packet: {}", e);
466                return None;
467            }
468        };
469
470        if !packet.reserved_bits_valid() {
471            debug!("dropping connection attempt with invalid reserved bits");
472            return None;
473        }
474
475        let Header::Initial(header) = packet.header else {
476            panic!("non-initial packet in handle_first_packet()");
477        };
478
479        let server_config = self.server_config.as_ref().unwrap().clone();
480
481        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
482            Ok(token) => token,
483            Err(InvalidRetryTokenError) => {
484                debug!("rejecting invalid retry token");
485                return Some(DatagramEvent::Response(self.initial_close(
486                    header.version,
487                    addresses,
488                    &crypto,
489                    &header.src_cid,
490                    TransportError::INVALID_TOKEN(""),
491                    buf,
492                )));
493            }
494        };
495
496        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
497        self.index
498            .insert_initial_incoming(header.dst_cid, incoming_idx);
499
500        Some(DatagramEvent::NewConnection(Incoming {
501            received_at: event.now,
502            addresses,
503            ecn: event.ecn,
504            packet: InitialPacket {
505                header,
506                header_data: packet.header_data,
507                payload: packet.payload,
508            },
509            rest: event.remaining,
510            crypto,
511            token,
512            incoming_idx,
513            improper_drop_warner: IncomingImproperDropWarner,
514        }))
515    }
516
517    /// Attempt to accept this incoming connection (an error may still occur)
518    // AcceptError cannot be made smaller without semver breakage
519    #[allow(clippy::result_large_err)]
520    pub fn accept(
521        &mut self,
522        mut incoming: Incoming,
523        now: Instant,
524        buf: &mut Vec<u8>,
525        server_config: Option<Arc<ServerConfig>>,
526    ) -> Result<(ConnectionHandle, Connection), AcceptError> {
527        let remote_address_validated = incoming.remote_address_validated();
528        incoming.improper_drop_warner.dismiss();
529        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
530        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
531
532        let packet_number = incoming.packet.header.number.expand(0);
533        let InitialHeader {
534            src_cid,
535            dst_cid,
536            version,
537            ..
538        } = incoming.packet.header;
539        let server_config =
540            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
541
542        if server_config
543            .transport
544            .max_idle_timeout
545            .is_some_and(|timeout| {
546                incoming.received_at + Duration::from_millis(timeout.into()) <= now
547            })
548        {
549            debug!("abandoning accept of stale initial");
550            self.index.remove_initial(dst_cid);
551            return Err(AcceptError {
552                cause: ConnectionError::TimedOut,
553                response: None,
554            });
555        }
556
557        if self.cids_exhausted() {
558            debug!("refusing connection");
559            self.index.remove_initial(dst_cid);
560            return Err(AcceptError {
561                cause: ConnectionError::CidsExhausted,
562                response: Some(self.initial_close(
563                    version,
564                    incoming.addresses,
565                    &incoming.crypto,
566                    &src_cid,
567                    TransportError::CONNECTION_REFUSED(""),
568                    buf,
569                )),
570            });
571        }
572
573        if incoming
574            .crypto
575            .packet
576            .remote
577            .decrypt(
578                packet_number,
579                &incoming.packet.header_data,
580                &mut incoming.packet.payload,
581            )
582            .is_err()
583        {
584            debug!(packet_number, "failed to authenticate initial packet");
585            self.index.remove_initial(dst_cid);
586            return Err(AcceptError {
587                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
588                response: None,
589            });
590        };
591
592        let ch = ConnectionHandle(self.connections.vacant_key());
593        let loc_cid = self.new_cid(ch);
594        let mut params = TransportParameters::new(
595            &server_config.transport,
596            &self.config,
597            self.local_cid_generator.as_ref(),
598            loc_cid,
599            Some(&server_config),
600            &mut self.rng,
601        );
602        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
603        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
604        params.retry_src_cid = incoming.token.retry_src_cid;
605        let mut pref_addr_cid = None;
606        if server_config.has_preferred_address() {
607            let cid = self.new_cid(ch);
608            pref_addr_cid = Some(cid);
609            params.preferred_address = Some(PreferredAddress {
610                address_v4: server_config.preferred_address_v4,
611                address_v6: server_config.preferred_address_v6,
612                connection_id: cid,
613                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
614            });
615        }
616
617        let tls = server_config.crypto.clone().start_session(version, &params);
618        let transport_config = server_config.transport.clone();
619        let mut conn = self.add_connection(
620            ch,
621            version,
622            dst_cid,
623            loc_cid,
624            src_cid,
625            incoming.addresses,
626            incoming.received_at,
627            tls,
628            transport_config,
629            SideArgs::Server {
630                server_config,
631                pref_addr_cid,
632                path_validated: remote_address_validated,
633            },
634        );
635        self.index.insert_initial(dst_cid, ch);
636
637        match conn.handle_first_packet(
638            incoming.received_at,
639            incoming.addresses.remote,
640            incoming.ecn,
641            packet_number,
642            incoming.packet,
643            incoming.rest,
644        ) {
645            Ok(()) => {
646                trace!(id = ch.0, icid = %dst_cid, "new connection");
647
648                for event in incoming_buffer.datagrams {
649                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
650                }
651
652                Ok((ch, conn))
653            }
654            Err(e) => {
655                debug!("handshake failed: {}", e);
656                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
657                let response = match e {
658                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
659                        version,
660                        incoming.addresses,
661                        &incoming.crypto,
662                        &src_cid,
663                        e.clone(),
664                        buf,
665                    )),
666                    _ => None,
667                };
668                Err(AcceptError { cause: e, response })
669            }
670        }
671    }
672
673    /// Check if we should refuse a connection attempt regardless of the packet's contents
674    fn early_validate_first_packet(
675        &mut self,
676        header: &ProtectedInitialHeader,
677    ) -> Result<(), TransportError> {
678        let config = &self.server_config.as_ref().unwrap();
679        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
680            return Err(TransportError::CONNECTION_REFUSED(""));
681        }
682
683        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
684        // bytes. If this is a Retry packet, then the length must instead match our usual CID
685        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
686        // also need to validate CID length for those after decoding the token.
687        if header.dst_cid.len() < 8
688            && (header.token_pos.is_empty()
689                || header.dst_cid.len() != self.local_cid_generator.cid_len())
690        {
691            debug!(
692                "rejecting connection due to invalid DCID length {}",
693                header.dst_cid.len()
694            );
695            return Err(TransportError::PROTOCOL_VIOLATION(
696                "invalid destination CID length",
697            ));
698        }
699
700        Ok(())
701    }
702
703    /// Reject this incoming connection attempt
704    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
705        self.clean_up_incoming(&incoming);
706        incoming.improper_drop_warner.dismiss();
707
708        self.initial_close(
709            incoming.packet.header.version,
710            incoming.addresses,
711            &incoming.crypto,
712            &incoming.packet.header.src_cid,
713            TransportError::CONNECTION_REFUSED(""),
714            buf,
715        )
716    }
717
718    /// Respond with a retry packet, requiring the client to retry with address validation
719    ///
720    /// Errors if `incoming.may_retry()` is false.
721    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
722        if !incoming.may_retry() {
723            return Err(RetryError(Box::new(incoming)));
724        }
725
726        self.clean_up_incoming(&incoming);
727        incoming.improper_drop_warner.dismiss();
728
729        let server_config = self.server_config.as_ref().unwrap();
730
731        // First Initial
732        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
733        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
734        // with established connections. In the unlikely event that a collision occurs
735        // between two connections in the initial phase, both will fail fast and may be
736        // retried by the application layer.
737        let loc_cid = self.local_cid_generator.generate_cid();
738
739        let payload = TokenPayload::Retry {
740            address: incoming.addresses.remote,
741            orig_dst_cid: incoming.packet.header.dst_cid,
742            issued: server_config.time_source.now(),
743        };
744        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
745
746        let header = Header::Retry {
747            src_cid: loc_cid,
748            dst_cid: incoming.packet.header.src_cid,
749            version: incoming.packet.header.version,
750        };
751
752        let encode = header.encode(buf);
753        buf.put_slice(&token);
754        buf.extend_from_slice(&server_config.crypto.retry_tag(
755            incoming.packet.header.version,
756            &incoming.packet.header.dst_cid,
757            buf,
758        ));
759        encode.finish(buf, &*incoming.crypto.header.local, None);
760
761        Ok(Transmit {
762            destination: incoming.addresses.remote,
763            ecn: None,
764            size: buf.len(),
765            segment_size: None,
766            src_ip: incoming.addresses.local_ip,
767        })
768    }
769
770    /// Ignore this incoming connection attempt, not sending any packet in response
771    ///
772    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
773    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
774    pub fn ignore(&mut self, incoming: Incoming) {
775        self.clean_up_incoming(&incoming);
776        incoming.improper_drop_warner.dismiss();
777    }
778
779    /// Clean up endpoint data structures associated with an `Incoming`.
780    fn clean_up_incoming(&mut self, incoming: &Incoming) {
781        self.index.remove_initial(incoming.packet.header.dst_cid);
782        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
783        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
784    }
785
786    fn add_connection(
787        &mut self,
788        ch: ConnectionHandle,
789        version: u32,
790        init_cid: ConnectionId,
791        loc_cid: ConnectionId,
792        rem_cid: ConnectionId,
793        addresses: FourTuple,
794        now: Instant,
795        tls: Box<dyn crypto::Session>,
796        transport_config: Arc<TransportConfig>,
797        side_args: SideArgs,
798    ) -> Connection {
799        let mut rng_seed = [0; 32];
800        self.rng.fill_bytes(&mut rng_seed);
801        let side = side_args.side();
802        let pref_addr_cid = side_args.pref_addr_cid();
803        let conn = Connection::new(
804            self.config.clone(),
805            transport_config,
806            init_cid,
807            loc_cid,
808            rem_cid,
809            addresses.remote,
810            addresses.local_ip,
811            tls,
812            self.local_cid_generator.as_ref(),
813            now,
814            version,
815            self.allow_mtud,
816            rng_seed,
817            side_args,
818        );
819
820        let mut cids_issued = 0;
821        let mut loc_cids = FxHashMap::default();
822
823        loc_cids.insert(cids_issued, loc_cid);
824        cids_issued += 1;
825
826        if let Some(cid) = pref_addr_cid {
827            debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
828            loc_cids.insert(cids_issued, cid);
829            cids_issued += 1;
830        }
831
832        let id = self.connections.insert(ConnectionMeta {
833            init_cid,
834            cids_issued,
835            loc_cids,
836            addresses,
837            side,
838            reset_token: None,
839        });
840        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
841
842        self.index.insert_conn(addresses, loc_cid, ch, side);
843
844        conn
845    }
846
847    fn initial_close(
848        &mut self,
849        version: u32,
850        addresses: FourTuple,
851        crypto: &Keys,
852        remote_id: &ConnectionId,
853        reason: TransportError,
854        buf: &mut Vec<u8>,
855    ) -> Transmit {
856        // We don't need to worry about CID collisions in initial closes because the peer
857        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
858        // unexpected response.
859        let local_id = self.local_cid_generator.generate_cid();
860        let number = PacketNumber::U8(0);
861        let header = Header::Initial(InitialHeader {
862            dst_cid: *remote_id,
863            src_cid: local_id,
864            number,
865            token: Bytes::new(),
866            version,
867        });
868
869        let partial_encode = header.encode(buf);
870        let max_len =
871            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
872        frame::Close::from(reason).encode(buf, max_len);
873        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
874        partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
875        Transmit {
876            destination: addresses.remote,
877            ecn: None,
878            size: buf.len(),
879            segment_size: None,
880            src_ip: addresses.local_ip,
881        }
882    }
883
884    /// Access the configuration used by this endpoint
885    pub fn config(&self) -> &EndpointConfig {
886        &self.config
887    }
888
889    /// Number of connections that are currently open
890    pub fn open_connections(&self) -> usize {
891        self.connections.len()
892    }
893
894    /// Counter for the number of bytes currently used
895    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
896    pub fn incoming_buffer_bytes(&self) -> u64 {
897        self.all_incoming_buffers_total_bytes
898    }
899
900    #[cfg(test)]
901    pub(crate) fn known_connections(&self) -> usize {
902        let x = self.connections.len();
903        debug_assert_eq!(x, self.index.connection_ids_initial.len());
904        // Not all connections have known reset tokens
905        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
906        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
907        debug_assert!(x >= self.index.incoming_connection_remotes.len());
908        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
909        x
910    }
911
912    #[cfg(test)]
913    pub(crate) fn known_cids(&self) -> usize {
914        self.index.connection_ids.len()
915    }
916
917    /// Whether we've used up 3/4 of the available CID space
918    ///
919    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
920    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
921    fn cids_exhausted(&self) -> bool {
922        self.local_cid_generator.cid_len() <= 4
923            && self.local_cid_generator.cid_len() != 0
924            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
925                - self.index.connection_ids.len())
926                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
927    }
928}
929
930impl fmt::Debug for Endpoint {
931    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
932        fmt.debug_struct("Endpoint")
933            .field("rng", &self.rng)
934            .field("index", &self.index)
935            .field("connections", &self.connections)
936            .field("config", &self.config)
937            .field("server_config", &self.server_config)
938            // incoming_buffers too large
939            .field("incoming_buffers.len", &self.incoming_buffers.len())
940            .field(
941                "all_incoming_buffers_total_bytes",
942                &self.all_incoming_buffers_total_bytes,
943            )
944            .finish()
945    }
946}
947
948/// Buffered Initial and 0-RTT messages for a pending incoming connection
949#[derive(Default)]
950struct IncomingBuffer {
951    datagrams: Vec<DatagramConnectionEvent>,
952    total_bytes: u64,
953}
954
955/// Part of protocol state incoming datagrams can be routed to
956#[derive(Copy, Clone, Debug)]
957enum RouteDatagramTo {
958    Incoming(usize),
959    Connection(ConnectionHandle),
960}
961
962/// Maps packets to existing connections
963#[derive(Default, Debug)]
964struct ConnectionIndex {
965    /// Identifies connections based on the initial DCID the peer utilized
966    ///
967    /// Uses a standard `HashMap` to protect against hash collision attacks.
968    ///
969    /// Used by the server, not the client.
970    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
971    /// Identifies connections based on locally created CIDs
972    ///
973    /// Uses a cheaper hash function since keys are locally created
974    connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
975    /// Identifies incoming connections with zero-length CIDs
976    ///
977    /// Uses a standard `HashMap` to protect against hash collision attacks.
978    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
979    /// Identifies outgoing connections with zero-length CIDs
980    ///
981    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
982    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
983    /// may be established per remote. We must omit the local address from the key because we don't
984    /// necessarily know what address we're sending from, and hence receiving at.
985    ///
986    /// Uses a standard `HashMap` to protect against hash collision attacks.
987    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
988    /// Reset tokens provided by the peer for the CID each connection is currently sending to
989    ///
990    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
991    /// recipient, if any.
992    connection_reset_tokens: ResetTokenTable,
993}
994
995impl ConnectionIndex {
996    /// Associate an incoming connection with its initial destination CID
997    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
998        if dst_cid.is_empty() {
999            return;
1000        }
1001        self.connection_ids_initial
1002            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1003    }
1004
1005    /// Remove an association with an initial destination CID
1006    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1007        if dst_cid.is_empty() {
1008            return;
1009        }
1010        let removed = self.connection_ids_initial.remove(&dst_cid);
1011        debug_assert!(removed.is_some());
1012    }
1013
1014    /// Associate a connection with its initial destination CID
1015    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1016        if dst_cid.is_empty() {
1017            return;
1018        }
1019        self.connection_ids_initial
1020            .insert(dst_cid, RouteDatagramTo::Connection(connection));
1021    }
1022
1023    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1024    /// its current 4-tuple
1025    fn insert_conn(
1026        &mut self,
1027        addresses: FourTuple,
1028        dst_cid: ConnectionId,
1029        connection: ConnectionHandle,
1030        side: Side,
1031    ) {
1032        match dst_cid.len() {
1033            0 => match side {
1034                Side::Server => {
1035                    self.incoming_connection_remotes
1036                        .insert(addresses, connection);
1037                }
1038                Side::Client => {
1039                    self.outgoing_connection_remotes
1040                        .insert(addresses.remote, connection);
1041                }
1042            },
1043            _ => {
1044                self.connection_ids.insert(dst_cid, connection);
1045            }
1046        }
1047    }
1048
1049    /// Discard a connection ID
1050    fn retire(&mut self, dst_cid: ConnectionId) {
1051        self.connection_ids.remove(&dst_cid);
1052    }
1053
1054    /// Remove all references to a connection
1055    fn remove(&mut self, conn: &ConnectionMeta) {
1056        if conn.side.is_server() {
1057            self.remove_initial(conn.init_cid);
1058        }
1059        for cid in conn.loc_cids.values() {
1060            self.connection_ids.remove(cid);
1061        }
1062        self.incoming_connection_remotes.remove(&conn.addresses);
1063        self.outgoing_connection_remotes
1064            .remove(&conn.addresses.remote);
1065        if let Some((remote, token)) = conn.reset_token {
1066            self.connection_reset_tokens.remove(remote, token);
1067        }
1068    }
1069
1070    /// Find the existing connection that `datagram` should be routed to, if any
1071    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1072        if !datagram.dst_cid().is_empty() {
1073            if let Some(&ch) = self.connection_ids.get(datagram.dst_cid()) {
1074                return Some(RouteDatagramTo::Connection(ch));
1075            }
1076        }
1077        if datagram.is_initial() || datagram.is_0rtt() {
1078            if let Some(&ch) = self.connection_ids_initial.get(datagram.dst_cid()) {
1079                return Some(ch);
1080            }
1081        }
1082        if datagram.dst_cid().is_empty() {
1083            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1084                return Some(RouteDatagramTo::Connection(ch));
1085            }
1086            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1087                return Some(RouteDatagramTo::Connection(ch));
1088            }
1089        }
1090        let data = datagram.data();
1091        if data.len() < RESET_TOKEN_SIZE {
1092            return None;
1093        }
1094        self.connection_reset_tokens
1095            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1096            .cloned()
1097            .map(RouteDatagramTo::Connection)
1098    }
1099}
1100
1101#[derive(Debug)]
1102pub(crate) struct ConnectionMeta {
1103    init_cid: ConnectionId,
1104    /// Number of local connection IDs that have been issued in NEW_CONNECTION_ID frames.
1105    cids_issued: u64,
1106    loc_cids: FxHashMap<u64, ConnectionId>,
1107    /// Remote/local addresses the connection began with
1108    ///
1109    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1110    /// bother keeping it up to date.
1111    addresses: FourTuple,
1112    side: Side,
1113    /// Reset token provided by the peer for the CID we're currently sending to, and the address
1114    /// being sent to
1115    reset_token: Option<(SocketAddr, ResetToken)>,
1116}
1117
1118/// Internal identifier for a `Connection` currently associated with an endpoint
1119#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1120pub struct ConnectionHandle(pub usize);
1121
1122impl From<ConnectionHandle> for usize {
1123    fn from(x: ConnectionHandle) -> Self {
1124        x.0
1125    }
1126}
1127
1128impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1129    type Output = ConnectionMeta;
1130    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1131        &self[ch.0]
1132    }
1133}
1134
1135impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1136    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1137        &mut self[ch.0]
1138    }
1139}
1140
1141/// Event resulting from processing a single datagram
1142pub enum DatagramEvent {
1143    /// The datagram is redirected to its `Connection`
1144    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1145    /// The datagram may result in starting a new `Connection`
1146    NewConnection(Incoming),
1147    /// Response generated directly by the endpoint
1148    Response(Transmit),
1149}
1150
1151/// An incoming connection for which the server has not yet begun its part of the handshake.
1152pub struct Incoming {
1153    received_at: Instant,
1154    addresses: FourTuple,
1155    ecn: Option<EcnCodepoint>,
1156    packet: InitialPacket,
1157    rest: Option<BytesMut>,
1158    crypto: Keys,
1159    token: IncomingToken,
1160    incoming_idx: usize,
1161    improper_drop_warner: IncomingImproperDropWarner,
1162}
1163
1164impl Incoming {
1165    /// The local IP address which was used when the peer established the connection
1166    ///
1167    /// This has the same behavior as [`Connection::local_ip`].
1168    pub fn local_ip(&self) -> Option<IpAddr> {
1169        self.addresses.local_ip
1170    }
1171
1172    /// The peer's UDP address
1173    pub fn remote_address(&self) -> SocketAddr {
1174        self.addresses.remote
1175    }
1176
1177    /// Whether the socket address that is initiating this connection has been validated
1178    ///
1179    /// This means that the sender of the initial packet has proved that they can receive traffic
1180    /// sent to `self.remote_address()`.
1181    ///
1182    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1183    /// The inverse is not guaranteed.
1184    pub fn remote_address_validated(&self) -> bool {
1185        self.token.validated
1186    }
1187
1188    /// Whether it is legal to respond with a retry packet
1189    ///
1190    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1191    /// The inverse is not guaranteed.
1192    pub fn may_retry(&self) -> bool {
1193        self.token.retry_src_cid.is_none()
1194    }
1195
1196    /// The original destination connection ID sent by the client
1197    pub fn orig_dst_cid(&self) -> &ConnectionId {
1198        &self.token.orig_dst_cid
1199    }
1200}
1201
1202impl fmt::Debug for Incoming {
1203    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1204        f.debug_struct("Incoming")
1205            .field("addresses", &self.addresses)
1206            .field("ecn", &self.ecn)
1207            // packet doesn't implement debug
1208            // rest is too big and not meaningful enough
1209            .field("token", &self.token)
1210            .field("incoming_idx", &self.incoming_idx)
1211            // improper drop warner contains no information
1212            .finish_non_exhaustive()
1213    }
1214}
1215
1216struct IncomingImproperDropWarner;
1217
1218impl IncomingImproperDropWarner {
1219    fn dismiss(self) {
1220        mem::forget(self);
1221    }
1222}
1223
1224impl Drop for IncomingImproperDropWarner {
1225    fn drop(&mut self) {
1226        warn!(
1227            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1228               (may cause memory leak and eventual inability to accept new connections)"
1229        );
1230    }
1231}
1232
1233/// Errors in the parameters being used to create a new connection
1234///
1235/// These arise before any I/O has been performed.
1236#[derive(Debug, Error, Clone, PartialEq, Eq)]
1237pub enum ConnectError {
1238    /// The endpoint can no longer create new connections
1239    ///
1240    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1241    #[error("endpoint stopping")]
1242    EndpointStopping,
1243    /// The connection could not be created because not enough of the CID space is available
1244    ///
1245    /// Try using longer connection IDs
1246    #[error("CIDs exhausted")]
1247    CidsExhausted,
1248    /// The given server name was malformed
1249    #[error("invalid server name: {0}")]
1250    InvalidServerName(String),
1251    /// The remote [`SocketAddr`] supplied was malformed
1252    ///
1253    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1254    #[error("invalid remote address: {0}")]
1255    InvalidRemoteAddress(SocketAddr),
1256    /// No default client configuration was set up
1257    ///
1258    /// Use `Endpoint::connect_with` to specify a client configuration.
1259    #[error("no default client config")]
1260    NoDefaultClientConfig,
1261    /// The local endpoint does not support the QUIC version specified in the client configuration
1262    #[error("unsupported QUIC version")]
1263    UnsupportedVersion,
1264}
1265
1266/// Error type for attempting to accept an [`Incoming`]
1267#[derive(Debug)]
1268pub struct AcceptError {
1269    /// Underlying error describing reason for failure
1270    pub cause: ConnectionError,
1271    /// Optional response to transmit back
1272    pub response: Option<Transmit>,
1273}
1274
1275/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1276#[derive(Debug, Error)]
1277#[error("retry() with validated Incoming")]
1278pub struct RetryError(Box<Incoming>);
1279
1280impl RetryError {
1281    /// Get the [`Incoming`]
1282    pub fn into_incoming(self) -> Incoming {
1283        *self.0
1284    }
1285}
1286
1287/// Reset Tokens which are associated with peer socket addresses
1288///
1289/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1290/// peer generated and might be usable for hash collision attacks.
1291#[derive(Default, Debug)]
1292struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1293
1294impl ResetTokenTable {
1295    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1296        self.0
1297            .entry(remote)
1298            .or_default()
1299            .insert(token, ch)
1300            .is_some()
1301    }
1302
1303    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1304        use std::collections::hash_map::Entry;
1305        match self.0.entry(remote) {
1306            Entry::Vacant(_) => {}
1307            Entry::Occupied(mut e) => {
1308                e.get_mut().remove(&token);
1309                if e.get().is_empty() {
1310                    e.remove_entry();
1311                }
1312            }
1313        }
1314    }
1315
1316    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1317        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1318        self.0.get(&remote)?.get(&token)
1319    }
1320}
1321
1322/// Identifies a connection by the combination of remote and local addresses
1323///
1324/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1325/// subnet and zero-length connection IDs are in use.
1326#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1327struct FourTuple {
1328    remote: SocketAddr,
1329    // A single socket can only listen on a single port, so no need to store it explicitly
1330    local_ip: Option<IpAddr>,
1331}