1use std::{
2 collections::{HashMap, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
19 Side, Transmit, TransportConfig, TransportError,
20 cid_generator::ConnectionIdGenerator,
21 coding::BufMutExt,
22 config::{ClientConfig, EndpointConfig, ServerConfig},
23 connection::{Connection, ConnectionError, SideArgs},
24 crypto::{self, Keys, UnsupportedVersion},
25 frame,
26 packet::{
27 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28 PacketNumber, PartialDecode, ProtectedInitialHeader,
29 },
30 shared::{
31 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32 EndpointEvent, EndpointEventInner, IssuedCid,
33 },
34 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35 transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38pub struct Endpoint {
43 rng: StdRng,
44 index: ConnectionIndex,
45 connections: Slab<ConnectionMeta>,
46 local_cid_generator: Box<dyn ConnectionIdGenerator>,
47 config: Arc<EndpointConfig>,
48 server_config: Option<Arc<ServerConfig>>,
49 allow_mtud: bool,
51 last_stateless_reset: Option<Instant>,
53 incoming_buffers: Slab<IncomingBuffer>,
55 all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59 pub fn new(
70 config: Arc<EndpointConfig>,
71 server_config: Option<Arc<ServerConfig>>,
72 allow_mtud: bool,
73 rng_seed: Option<[u8; 32]>,
74 ) -> Self {
75 let rng_seed = rng_seed.or(config.rng_seed);
76 Self {
77 rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78 index: ConnectionIndex::default(),
79 connections: Slab::new(),
80 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81 config,
82 server_config,
83 allow_mtud,
84 last_stateless_reset: None,
85 incoming_buffers: Slab::new(),
86 all_incoming_buffers_total_bytes: 0,
87 }
88 }
89
90 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92 self.server_config = server_config;
93 }
94
95 pub fn handle_event(
99 &mut self,
100 ch: ConnectionHandle,
101 event: EndpointEvent,
102 ) -> Option<ConnectionEvent> {
103 use EndpointEventInner::*;
104 match event.0 {
105 NeedIdentifiers(now, n) => {
106 return Some(self.send_new_identifiers(now, ch, n));
107 }
108 ResetToken(remote, token) => {
109 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
110 self.index.connection_reset_tokens.remove(old.0, old.1);
111 }
112 if self.index.connection_reset_tokens.insert(remote, token, ch) {
113 warn!("duplicate reset token");
114 }
115 }
116 RetireConnectionId(now, seq, allow_more_cids) => {
117 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
118 trace!("peer retired CID {}: {}", seq, cid);
119 self.index.retire(cid);
120 if allow_more_cids {
121 return Some(self.send_new_identifiers(now, ch, 1));
122 }
123 }
124 }
125 Drained => {
126 if let Some(conn) = self.connections.try_remove(ch.0) {
127 self.index.remove(&conn);
128 } else {
129 error!(id = ch.0, "unknown connection drained");
133 }
134 }
135 }
136 None
137 }
138
139 pub fn handle(
141 &mut self,
142 now: Instant,
143 remote: SocketAddr,
144 local_ip: Option<IpAddr>,
145 ecn: Option<EcnCodepoint>,
146 data: BytesMut,
147 buf: &mut Vec<u8>,
148 ) -> Option<DatagramEvent> {
149 let datagram_len = data.len();
151 let event = match PartialDecode::new(
152 data,
153 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
154 &self.config.supported_versions,
155 self.config.grease_quic_bit,
156 ) {
157 Ok((first_decode, remaining)) => DatagramConnectionEvent {
158 now,
159 remote,
160 ecn,
161 first_decode,
162 remaining,
163 },
164 Err(PacketDecodeError::UnsupportedVersion {
165 src_cid,
166 dst_cid,
167 version,
168 }) => {
169 if self.server_config.is_none() {
170 debug!("dropping packet with unsupported version");
171 return None;
172 }
173 trace!("sending version negotiation");
174 Header::VersionNegotiate {
176 random: self.rng.random::<u8>() | 0x40,
177 src_cid: dst_cid,
178 dst_cid: src_cid,
179 }
180 .encode(buf);
181 buf.write::<u32>(match version {
183 0x0a1a_2a3a => 0x0a1a_2a4a,
184 _ => 0x0a1a_2a3a,
185 });
186 for &version in &self.config.supported_versions {
187 buf.write(version);
188 }
189 return Some(DatagramEvent::Response(Transmit {
190 destination: remote,
191 ecn: None,
192 size: buf.len(),
193 segment_size: None,
194 src_ip: local_ip,
195 }));
196 }
197 Err(e) => {
198 trace!("malformed header: {}", e);
199 return None;
200 }
201 };
202
203 let addresses = FourTuple { remote, local_ip };
204 let dst_cid = event.first_decode.dst_cid();
205
206 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
207 match route_to {
209 RouteDatagramTo::Incoming(incoming_idx) => {
210 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
211 let config = &self.server_config.as_ref().unwrap();
212
213 if incoming_buffer
214 .total_bytes
215 .checked_add(datagram_len as u64)
216 .is_some_and(|n| n <= config.incoming_buffer_size)
217 && self
218 .all_incoming_buffers_total_bytes
219 .checked_add(datagram_len as u64)
220 .is_some_and(|n| n <= config.incoming_buffer_size_total)
221 {
222 incoming_buffer.datagrams.push(event);
223 incoming_buffer.total_bytes += datagram_len as u64;
224 self.all_incoming_buffers_total_bytes += datagram_len as u64;
225 }
226
227 None
228 }
229 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
230 ch,
231 ConnectionEvent(ConnectionEventInner::Datagram(event)),
232 )),
233 }
234 } else if event.first_decode.initial_header().is_some() {
235 self.handle_first_packet(datagram_len, event, addresses, buf)
238 } else if event.first_decode.has_long_header() {
239 debug!(
240 "ignoring non-initial packet for unknown connection {}",
241 dst_cid
242 );
243 None
244 } else if !event.first_decode.is_initial()
245 && self.local_cid_generator.validate(dst_cid).is_err()
246 {
247 debug!("dropping packet with invalid CID");
248 None
249 } else if dst_cid.is_empty() {
250 trace!("dropping unrecognized short packet without ID");
251 None
252 } else {
253 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
256 .map(DatagramEvent::Response)
257 }
258 }
259
260 fn stateless_reset(
261 &mut self,
262 now: Instant,
263 inciting_dgram_len: usize,
264 addresses: FourTuple,
265 dst_cid: ConnectionId,
266 buf: &mut Vec<u8>,
267 ) -> Option<Transmit> {
268 if self
269 .last_stateless_reset
270 .is_some_and(|last| last + self.config.min_reset_interval > now)
271 {
272 debug!("ignoring unexpected packet within minimum stateless reset interval");
273 return None;
274 }
275
276 const MIN_PADDING_LEN: usize = 5;
278
279 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
282 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
283 _ => {
284 debug!(
285 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
286 inciting_dgram_len
287 );
288 return None;
289 }
290 };
291
292 debug!(
293 "sending stateless reset for {} to {}",
294 dst_cid, addresses.remote
295 );
296 self.last_stateless_reset = Some(now);
297 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
299 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
300 max_padding_len
301 } else {
302 self.rng
303 .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
304 };
305 buf.reserve(padding_len + RESET_TOKEN_SIZE);
306 buf.resize(padding_len, 0);
307 self.rng.fill_bytes(&mut buf[0..padding_len]);
308 buf[0] = 0b0100_0000 | (buf[0] >> 2);
309 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
310
311 debug_assert!(buf.len() < inciting_dgram_len);
312
313 Some(Transmit {
314 destination: addresses.remote,
315 ecn: None,
316 size: buf.len(),
317 segment_size: None,
318 src_ip: addresses.local_ip,
319 })
320 }
321
322 pub fn connect(
324 &mut self,
325 now: Instant,
326 config: ClientConfig,
327 remote: SocketAddr,
328 server_name: &str,
329 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
330 if self.cids_exhausted() {
331 return Err(ConnectError::CidsExhausted);
332 }
333 if remote.port() == 0 || remote.ip().is_unspecified() {
334 return Err(ConnectError::InvalidRemoteAddress(remote));
335 }
336 if !self.config.supported_versions.contains(&config.version) {
337 return Err(ConnectError::UnsupportedVersion);
338 }
339
340 let remote_id = (config.initial_dst_cid_provider)();
341 trace!(initial_dcid = %remote_id);
342
343 let ch = ConnectionHandle(self.connections.vacant_key());
344 let loc_cid = self.new_cid(ch);
345 let params = TransportParameters::new(
346 &config.transport,
347 &self.config,
348 self.local_cid_generator.as_ref(),
349 loc_cid,
350 None,
351 &mut self.rng,
352 );
353 let tls = config
354 .crypto
355 .start_session(config.version, server_name, ¶ms)?;
356
357 let conn = self.add_connection(
358 ch,
359 config.version,
360 remote_id,
361 loc_cid,
362 remote_id,
363 FourTuple {
364 remote,
365 local_ip: None,
366 },
367 now,
368 tls,
369 config.transport,
370 SideArgs::Client {
371 token_store: config.token_store,
372 server_name: server_name.into(),
373 },
374 );
375 Ok((ch, conn))
376 }
377
378 fn send_new_identifiers(
379 &mut self,
380 now: Instant,
381 ch: ConnectionHandle,
382 num: u64,
383 ) -> ConnectionEvent {
384 let mut ids = vec![];
385 for _ in 0..num {
386 let id = self.new_cid(ch);
387 let meta = &mut self.connections[ch];
388 let sequence = meta.cids_issued;
389 meta.cids_issued += 1;
390 meta.loc_cids.insert(sequence, id);
391 ids.push(IssuedCid {
392 sequence,
393 id,
394 reset_token: ResetToken::new(&*self.config.reset_key, id),
395 });
396 }
397 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
398 }
399
400 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
402 loop {
403 let cid = self.local_cid_generator.generate_cid();
404 if cid.is_empty() {
405 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
407 return cid;
408 }
409 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
410 e.insert(ch);
411 break cid;
412 }
413 }
414 }
415
416 fn handle_first_packet(
417 &mut self,
418 datagram_len: usize,
419 event: DatagramConnectionEvent,
420 addresses: FourTuple,
421 buf: &mut Vec<u8>,
422 ) -> Option<DatagramEvent> {
423 let dst_cid = event.first_decode.dst_cid();
424 let header = event.first_decode.initial_header().unwrap();
425
426 let Some(server_config) = &self.server_config else {
427 debug!("packet for unrecognized connection {}", dst_cid);
428 return self
429 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
430 .map(DatagramEvent::Response);
431 };
432
433 if datagram_len < MIN_INITIAL_SIZE as usize {
434 debug!("ignoring short initial for connection {}", dst_cid);
435 return None;
436 }
437
438 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
439 Ok(keys) => keys,
440 Err(UnsupportedVersion) => {
441 debug!(
444 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
445 header.version
446 );
447 return None;
448 }
449 };
450
451 if let Err(reason) = self.early_validate_first_packet(header) {
452 return Some(DatagramEvent::Response(self.initial_close(
453 header.version,
454 addresses,
455 &crypto,
456 &header.src_cid,
457 reason,
458 buf,
459 )));
460 }
461
462 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
463 Ok(packet) => packet,
464 Err(e) => {
465 trace!("unable to decode initial packet: {}", e);
466 return None;
467 }
468 };
469
470 if !packet.reserved_bits_valid() {
471 debug!("dropping connection attempt with invalid reserved bits");
472 return None;
473 }
474
475 let Header::Initial(header) = packet.header else {
476 panic!("non-initial packet in handle_first_packet()");
477 };
478
479 let server_config = self.server_config.as_ref().unwrap().clone();
480
481 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
482 Ok(token) => token,
483 Err(InvalidRetryTokenError) => {
484 debug!("rejecting invalid retry token");
485 return Some(DatagramEvent::Response(self.initial_close(
486 header.version,
487 addresses,
488 &crypto,
489 &header.src_cid,
490 TransportError::INVALID_TOKEN(""),
491 buf,
492 )));
493 }
494 };
495
496 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
497 self.index
498 .insert_initial_incoming(header.dst_cid, incoming_idx);
499
500 Some(DatagramEvent::NewConnection(Incoming {
501 received_at: event.now,
502 addresses,
503 ecn: event.ecn,
504 packet: InitialPacket {
505 header,
506 header_data: packet.header_data,
507 payload: packet.payload,
508 },
509 rest: event.remaining,
510 crypto,
511 token,
512 incoming_idx,
513 improper_drop_warner: IncomingImproperDropWarner,
514 }))
515 }
516
517 #[allow(clippy::result_large_err)]
520 pub fn accept(
521 &mut self,
522 mut incoming: Incoming,
523 now: Instant,
524 buf: &mut Vec<u8>,
525 server_config: Option<Arc<ServerConfig>>,
526 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
527 let remote_address_validated = incoming.remote_address_validated();
528 incoming.improper_drop_warner.dismiss();
529 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
530 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
531
532 let packet_number = incoming.packet.header.number.expand(0);
533 let InitialHeader {
534 src_cid,
535 dst_cid,
536 version,
537 ..
538 } = incoming.packet.header;
539 let server_config =
540 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
541
542 if server_config
543 .transport
544 .max_idle_timeout
545 .is_some_and(|timeout| {
546 incoming.received_at + Duration::from_millis(timeout.into()) <= now
547 })
548 {
549 debug!("abandoning accept of stale initial");
550 self.index.remove_initial(dst_cid);
551 return Err(AcceptError {
552 cause: ConnectionError::TimedOut,
553 response: None,
554 });
555 }
556
557 if self.cids_exhausted() {
558 debug!("refusing connection");
559 self.index.remove_initial(dst_cid);
560 return Err(AcceptError {
561 cause: ConnectionError::CidsExhausted,
562 response: Some(self.initial_close(
563 version,
564 incoming.addresses,
565 &incoming.crypto,
566 &src_cid,
567 TransportError::CONNECTION_REFUSED(""),
568 buf,
569 )),
570 });
571 }
572
573 if incoming
574 .crypto
575 .packet
576 .remote
577 .decrypt(
578 packet_number,
579 &incoming.packet.header_data,
580 &mut incoming.packet.payload,
581 )
582 .is_err()
583 {
584 debug!(packet_number, "failed to authenticate initial packet");
585 self.index.remove_initial(dst_cid);
586 return Err(AcceptError {
587 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
588 response: None,
589 });
590 };
591
592 let ch = ConnectionHandle(self.connections.vacant_key());
593 let loc_cid = self.new_cid(ch);
594 let mut params = TransportParameters::new(
595 &server_config.transport,
596 &self.config,
597 self.local_cid_generator.as_ref(),
598 loc_cid,
599 Some(&server_config),
600 &mut self.rng,
601 );
602 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
603 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
604 params.retry_src_cid = incoming.token.retry_src_cid;
605 let mut pref_addr_cid = None;
606 if server_config.has_preferred_address() {
607 let cid = self.new_cid(ch);
608 pref_addr_cid = Some(cid);
609 params.preferred_address = Some(PreferredAddress {
610 address_v4: server_config.preferred_address_v4,
611 address_v6: server_config.preferred_address_v6,
612 connection_id: cid,
613 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
614 });
615 }
616
617 let tls = server_config.crypto.clone().start_session(version, ¶ms);
618 let transport_config = server_config.transport.clone();
619 let mut conn = self.add_connection(
620 ch,
621 version,
622 dst_cid,
623 loc_cid,
624 src_cid,
625 incoming.addresses,
626 incoming.received_at,
627 tls,
628 transport_config,
629 SideArgs::Server {
630 server_config,
631 pref_addr_cid,
632 path_validated: remote_address_validated,
633 },
634 );
635 self.index.insert_initial(dst_cid, ch);
636
637 match conn.handle_first_packet(
638 incoming.received_at,
639 incoming.addresses.remote,
640 incoming.ecn,
641 packet_number,
642 incoming.packet,
643 incoming.rest,
644 ) {
645 Ok(()) => {
646 trace!(id = ch.0, icid = %dst_cid, "new connection");
647
648 for event in incoming_buffer.datagrams {
649 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
650 }
651
652 Ok((ch, conn))
653 }
654 Err(e) => {
655 debug!("handshake failed: {}", e);
656 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
657 let response = match e {
658 ConnectionError::TransportError(ref e) => Some(self.initial_close(
659 version,
660 incoming.addresses,
661 &incoming.crypto,
662 &src_cid,
663 e.clone(),
664 buf,
665 )),
666 _ => None,
667 };
668 Err(AcceptError { cause: e, response })
669 }
670 }
671 }
672
673 fn early_validate_first_packet(
675 &mut self,
676 header: &ProtectedInitialHeader,
677 ) -> Result<(), TransportError> {
678 let config = &self.server_config.as_ref().unwrap();
679 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
680 return Err(TransportError::CONNECTION_REFUSED(""));
681 }
682
683 if header.dst_cid.len() < 8
688 && (header.token_pos.is_empty()
689 || header.dst_cid.len() != self.local_cid_generator.cid_len())
690 {
691 debug!(
692 "rejecting connection due to invalid DCID length {}",
693 header.dst_cid.len()
694 );
695 return Err(TransportError::PROTOCOL_VIOLATION(
696 "invalid destination CID length",
697 ));
698 }
699
700 Ok(())
701 }
702
703 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
705 self.clean_up_incoming(&incoming);
706 incoming.improper_drop_warner.dismiss();
707
708 self.initial_close(
709 incoming.packet.header.version,
710 incoming.addresses,
711 &incoming.crypto,
712 &incoming.packet.header.src_cid,
713 TransportError::CONNECTION_REFUSED(""),
714 buf,
715 )
716 }
717
718 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
722 if !incoming.may_retry() {
723 return Err(RetryError(Box::new(incoming)));
724 }
725
726 self.clean_up_incoming(&incoming);
727 incoming.improper_drop_warner.dismiss();
728
729 let server_config = self.server_config.as_ref().unwrap();
730
731 let loc_cid = self.local_cid_generator.generate_cid();
738
739 let payload = TokenPayload::Retry {
740 address: incoming.addresses.remote,
741 orig_dst_cid: incoming.packet.header.dst_cid,
742 issued: server_config.time_source.now(),
743 };
744 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
745
746 let header = Header::Retry {
747 src_cid: loc_cid,
748 dst_cid: incoming.packet.header.src_cid,
749 version: incoming.packet.header.version,
750 };
751
752 let encode = header.encode(buf);
753 buf.put_slice(&token);
754 buf.extend_from_slice(&server_config.crypto.retry_tag(
755 incoming.packet.header.version,
756 &incoming.packet.header.dst_cid,
757 buf,
758 ));
759 encode.finish(buf, &*incoming.crypto.header.local, None);
760
761 Ok(Transmit {
762 destination: incoming.addresses.remote,
763 ecn: None,
764 size: buf.len(),
765 segment_size: None,
766 src_ip: incoming.addresses.local_ip,
767 })
768 }
769
770 pub fn ignore(&mut self, incoming: Incoming) {
775 self.clean_up_incoming(&incoming);
776 incoming.improper_drop_warner.dismiss();
777 }
778
779 fn clean_up_incoming(&mut self, incoming: &Incoming) {
781 self.index.remove_initial(incoming.packet.header.dst_cid);
782 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
783 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
784 }
785
786 fn add_connection(
787 &mut self,
788 ch: ConnectionHandle,
789 version: u32,
790 init_cid: ConnectionId,
791 loc_cid: ConnectionId,
792 rem_cid: ConnectionId,
793 addresses: FourTuple,
794 now: Instant,
795 tls: Box<dyn crypto::Session>,
796 transport_config: Arc<TransportConfig>,
797 side_args: SideArgs,
798 ) -> Connection {
799 let mut rng_seed = [0; 32];
800 self.rng.fill_bytes(&mut rng_seed);
801 let side = side_args.side();
802 let pref_addr_cid = side_args.pref_addr_cid();
803 let conn = Connection::new(
804 self.config.clone(),
805 transport_config,
806 init_cid,
807 loc_cid,
808 rem_cid,
809 addresses.remote,
810 addresses.local_ip,
811 tls,
812 self.local_cid_generator.as_ref(),
813 now,
814 version,
815 self.allow_mtud,
816 rng_seed,
817 side_args,
818 );
819
820 let mut cids_issued = 0;
821 let mut loc_cids = FxHashMap::default();
822
823 loc_cids.insert(cids_issued, loc_cid);
824 cids_issued += 1;
825
826 if let Some(cid) = pref_addr_cid {
827 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
828 loc_cids.insert(cids_issued, cid);
829 cids_issued += 1;
830 }
831
832 let id = self.connections.insert(ConnectionMeta {
833 init_cid,
834 cids_issued,
835 loc_cids,
836 addresses,
837 side,
838 reset_token: None,
839 });
840 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
841
842 self.index.insert_conn(addresses, loc_cid, ch, side);
843
844 conn
845 }
846
847 fn initial_close(
848 &mut self,
849 version: u32,
850 addresses: FourTuple,
851 crypto: &Keys,
852 remote_id: &ConnectionId,
853 reason: TransportError,
854 buf: &mut Vec<u8>,
855 ) -> Transmit {
856 let local_id = self.local_cid_generator.generate_cid();
860 let number = PacketNumber::U8(0);
861 let header = Header::Initial(InitialHeader {
862 dst_cid: *remote_id,
863 src_cid: local_id,
864 number,
865 token: Bytes::new(),
866 version,
867 });
868
869 let partial_encode = header.encode(buf);
870 let max_len =
871 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
872 frame::Close::from(reason).encode(buf, max_len);
873 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
874 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
875 Transmit {
876 destination: addresses.remote,
877 ecn: None,
878 size: buf.len(),
879 segment_size: None,
880 src_ip: addresses.local_ip,
881 }
882 }
883
884 pub fn config(&self) -> &EndpointConfig {
886 &self.config
887 }
888
889 pub fn open_connections(&self) -> usize {
891 self.connections.len()
892 }
893
894 pub fn incoming_buffer_bytes(&self) -> u64 {
897 self.all_incoming_buffers_total_bytes
898 }
899
900 #[cfg(test)]
901 pub(crate) fn known_connections(&self) -> usize {
902 let x = self.connections.len();
903 debug_assert_eq!(x, self.index.connection_ids_initial.len());
904 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
906 debug_assert!(x >= self.index.incoming_connection_remotes.len());
908 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
909 x
910 }
911
912 #[cfg(test)]
913 pub(crate) fn known_cids(&self) -> usize {
914 self.index.connection_ids.len()
915 }
916
917 fn cids_exhausted(&self) -> bool {
922 self.local_cid_generator.cid_len() <= 4
923 && self.local_cid_generator.cid_len() != 0
924 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
925 - self.index.connection_ids.len())
926 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
927 }
928}
929
930impl fmt::Debug for Endpoint {
931 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
932 fmt.debug_struct("Endpoint")
933 .field("rng", &self.rng)
934 .field("index", &self.index)
935 .field("connections", &self.connections)
936 .field("config", &self.config)
937 .field("server_config", &self.server_config)
938 .field("incoming_buffers.len", &self.incoming_buffers.len())
940 .field(
941 "all_incoming_buffers_total_bytes",
942 &self.all_incoming_buffers_total_bytes,
943 )
944 .finish()
945 }
946}
947
948#[derive(Default)]
950struct IncomingBuffer {
951 datagrams: Vec<DatagramConnectionEvent>,
952 total_bytes: u64,
953}
954
955#[derive(Copy, Clone, Debug)]
957enum RouteDatagramTo {
958 Incoming(usize),
959 Connection(ConnectionHandle),
960}
961
962#[derive(Default, Debug)]
964struct ConnectionIndex {
965 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
971 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
975 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
979 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
988 connection_reset_tokens: ResetTokenTable,
993}
994
995impl ConnectionIndex {
996 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
998 if dst_cid.is_empty() {
999 return;
1000 }
1001 self.connection_ids_initial
1002 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1003 }
1004
1005 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1007 if dst_cid.is_empty() {
1008 return;
1009 }
1010 let removed = self.connection_ids_initial.remove(&dst_cid);
1011 debug_assert!(removed.is_some());
1012 }
1013
1014 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1016 if dst_cid.is_empty() {
1017 return;
1018 }
1019 self.connection_ids_initial
1020 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1021 }
1022
1023 fn insert_conn(
1026 &mut self,
1027 addresses: FourTuple,
1028 dst_cid: ConnectionId,
1029 connection: ConnectionHandle,
1030 side: Side,
1031 ) {
1032 match dst_cid.len() {
1033 0 => match side {
1034 Side::Server => {
1035 self.incoming_connection_remotes
1036 .insert(addresses, connection);
1037 }
1038 Side::Client => {
1039 self.outgoing_connection_remotes
1040 .insert(addresses.remote, connection);
1041 }
1042 },
1043 _ => {
1044 self.connection_ids.insert(dst_cid, connection);
1045 }
1046 }
1047 }
1048
1049 fn retire(&mut self, dst_cid: ConnectionId) {
1051 self.connection_ids.remove(&dst_cid);
1052 }
1053
1054 fn remove(&mut self, conn: &ConnectionMeta) {
1056 if conn.side.is_server() {
1057 self.remove_initial(conn.init_cid);
1058 }
1059 for cid in conn.loc_cids.values() {
1060 self.connection_ids.remove(cid);
1061 }
1062 self.incoming_connection_remotes.remove(&conn.addresses);
1063 self.outgoing_connection_remotes
1064 .remove(&conn.addresses.remote);
1065 if let Some((remote, token)) = conn.reset_token {
1066 self.connection_reset_tokens.remove(remote, token);
1067 }
1068 }
1069
1070 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1072 if !datagram.dst_cid().is_empty() {
1073 if let Some(&ch) = self.connection_ids.get(datagram.dst_cid()) {
1074 return Some(RouteDatagramTo::Connection(ch));
1075 }
1076 }
1077 if datagram.is_initial() || datagram.is_0rtt() {
1078 if let Some(&ch) = self.connection_ids_initial.get(datagram.dst_cid()) {
1079 return Some(ch);
1080 }
1081 }
1082 if datagram.dst_cid().is_empty() {
1083 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1084 return Some(RouteDatagramTo::Connection(ch));
1085 }
1086 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1087 return Some(RouteDatagramTo::Connection(ch));
1088 }
1089 }
1090 let data = datagram.data();
1091 if data.len() < RESET_TOKEN_SIZE {
1092 return None;
1093 }
1094 self.connection_reset_tokens
1095 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1096 .cloned()
1097 .map(RouteDatagramTo::Connection)
1098 }
1099}
1100
1101#[derive(Debug)]
1102pub(crate) struct ConnectionMeta {
1103 init_cid: ConnectionId,
1104 cids_issued: u64,
1106 loc_cids: FxHashMap<u64, ConnectionId>,
1107 addresses: FourTuple,
1112 side: Side,
1113 reset_token: Option<(SocketAddr, ResetToken)>,
1116}
1117
1118#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1120pub struct ConnectionHandle(pub usize);
1121
1122impl From<ConnectionHandle> for usize {
1123 fn from(x: ConnectionHandle) -> Self {
1124 x.0
1125 }
1126}
1127
1128impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1129 type Output = ConnectionMeta;
1130 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1131 &self[ch.0]
1132 }
1133}
1134
1135impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1136 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1137 &mut self[ch.0]
1138 }
1139}
1140
1141pub enum DatagramEvent {
1143 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1145 NewConnection(Incoming),
1147 Response(Transmit),
1149}
1150
1151pub struct Incoming {
1153 received_at: Instant,
1154 addresses: FourTuple,
1155 ecn: Option<EcnCodepoint>,
1156 packet: InitialPacket,
1157 rest: Option<BytesMut>,
1158 crypto: Keys,
1159 token: IncomingToken,
1160 incoming_idx: usize,
1161 improper_drop_warner: IncomingImproperDropWarner,
1162}
1163
1164impl Incoming {
1165 pub fn local_ip(&self) -> Option<IpAddr> {
1169 self.addresses.local_ip
1170 }
1171
1172 pub fn remote_address(&self) -> SocketAddr {
1174 self.addresses.remote
1175 }
1176
1177 pub fn remote_address_validated(&self) -> bool {
1185 self.token.validated
1186 }
1187
1188 pub fn may_retry(&self) -> bool {
1193 self.token.retry_src_cid.is_none()
1194 }
1195
1196 pub fn orig_dst_cid(&self) -> &ConnectionId {
1198 &self.token.orig_dst_cid
1199 }
1200}
1201
1202impl fmt::Debug for Incoming {
1203 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1204 f.debug_struct("Incoming")
1205 .field("addresses", &self.addresses)
1206 .field("ecn", &self.ecn)
1207 .field("token", &self.token)
1210 .field("incoming_idx", &self.incoming_idx)
1211 .finish_non_exhaustive()
1213 }
1214}
1215
1216struct IncomingImproperDropWarner;
1217
1218impl IncomingImproperDropWarner {
1219 fn dismiss(self) {
1220 mem::forget(self);
1221 }
1222}
1223
1224impl Drop for IncomingImproperDropWarner {
1225 fn drop(&mut self) {
1226 warn!(
1227 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1228 (may cause memory leak and eventual inability to accept new connections)"
1229 );
1230 }
1231}
1232
1233#[derive(Debug, Error, Clone, PartialEq, Eq)]
1237pub enum ConnectError {
1238 #[error("endpoint stopping")]
1242 EndpointStopping,
1243 #[error("CIDs exhausted")]
1247 CidsExhausted,
1248 #[error("invalid server name: {0}")]
1250 InvalidServerName(String),
1251 #[error("invalid remote address: {0}")]
1255 InvalidRemoteAddress(SocketAddr),
1256 #[error("no default client config")]
1260 NoDefaultClientConfig,
1261 #[error("unsupported QUIC version")]
1263 UnsupportedVersion,
1264}
1265
1266#[derive(Debug)]
1268pub struct AcceptError {
1269 pub cause: ConnectionError,
1271 pub response: Option<Transmit>,
1273}
1274
1275#[derive(Debug, Error)]
1277#[error("retry() with validated Incoming")]
1278pub struct RetryError(Box<Incoming>);
1279
1280impl RetryError {
1281 pub fn into_incoming(self) -> Incoming {
1283 *self.0
1284 }
1285}
1286
1287#[derive(Default, Debug)]
1292struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1293
1294impl ResetTokenTable {
1295 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1296 self.0
1297 .entry(remote)
1298 .or_default()
1299 .insert(token, ch)
1300 .is_some()
1301 }
1302
1303 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1304 use std::collections::hash_map::Entry;
1305 match self.0.entry(remote) {
1306 Entry::Vacant(_) => {}
1307 Entry::Occupied(mut e) => {
1308 e.get_mut().remove(&token);
1309 if e.get().is_empty() {
1310 e.remove_entry();
1311 }
1312 }
1313 }
1314 }
1315
1316 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1317 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1318 self.0.get(&remote)?.get(&token)
1319 }
1320}
1321
1322#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1327struct FourTuple {
1328 remote: SocketAddr,
1329 local_ip: Option<IpAddr>,
1331}