1use std::{
2 collections::{HashMap, VecDeque, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21 Side, Transmit, TransportConfig, TransportError,
22 cid_generator::ConnectionIdGenerator,
23 coding::BufMutExt,
24 config::{ClientConfig, EndpointConfig, ServerConfig},
25 connection::{Connection, ConnectionError, SideArgs},
26 crypto::{self, Keys, UnsupportedVersion},
27 frame,
28 nat_traversal_api::PeerId,
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31 PacketNumber, PartialDecode, ProtectedInitialHeader,
32 },
33 shared::{
34 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35 EndpointEvent, EndpointEventInner, IssuedCid,
36 },
37 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38 transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41#[derive(Debug, Clone)]
43struct RelayQueueItem {
44 target_peer_id: PeerId,
46 frame: frame::PunchMeNow,
48 created_at: Instant,
50 attempts: u32,
52 last_attempt: Option<Instant>,
54}
55
56#[derive(Debug)]
58struct RelayQueue {
59 pending: IndexMap<u64, RelayQueueItem>,
61 next_seq: u64,
63 max_queue_size: usize,
65 request_timeout: Duration,
67 max_retries: u32,
69 retry_interval: Duration,
71 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73 max_relays_per_peer: usize,
75 rate_limit_window: Duration,
77}
78
79#[derive(Debug, Default, Clone)]
81pub struct AddressDiscoveryStats {
82 pub frames_sent: u64,
84 pub frames_received: u64,
86 pub addresses_discovered: u64,
88 pub address_changes_detected: u64,
90}
91
92#[derive(Debug, Default)]
94pub struct RelayStats {
95 requests_received: u64,
97 requests_relayed: u64,
99 requests_failed: u64,
101 requests_dropped: u64,
103 requests_timed_out: u64,
105 requests_rate_limited: u64,
107 current_queue_size: usize,
109}
110
111impl RelayQueue {
112 fn new() -> Self {
114 Self {
115 pending: IndexMap::new(),
116 next_seq: 0,
117 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
120 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
122 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
125 }
126
127 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
129 if self.pending.len() >= self.max_queue_size {
131 warn!(
132 "Relay queue full, dropping request for peer {:?}",
133 target_peer_id
134 );
135 return false;
136 }
137
138 if !self.check_rate_limit(target_peer_id, now) {
140 warn!(
141 "Rate limit exceeded for peer {:?}, dropping relay request",
142 target_peer_id
143 );
144 return false;
145 }
146
147 let item = RelayQueueItem {
148 target_peer_id,
149 frame,
150 created_at: now,
151 attempts: 0,
152 last_attempt: None,
153 };
154
155 let seq = self.next_seq;
156 self.next_seq += 1;
157 self.pending.insert(seq, item);
158
159 self.record_relay_request(target_peer_id, now);
161
162 trace!(
163 "Queued relay request for peer {:?}, queue size: {}",
164 target_peer_id,
165 self.pending.len()
166 );
167 true
168 }
169
170 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
172 self.cleanup_rate_limiter(now);
174
175 if let Some(requests) = self.rate_limiter.get(&peer_id) {
177 requests.len() < self.max_relays_per_peer
178 } else {
179 true }
181 }
182
183 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
185 self.rate_limiter.entry(peer_id).or_default().push_back(now);
186 }
187
188 fn cleanup_rate_limiter(&mut self, now: Instant) {
190 self.rate_limiter.retain(|_, requests| {
191 requests.retain(|&request_time| {
192 now.saturating_duration_since(request_time) <= self.rate_limit_window
193 });
194 !requests.is_empty()
195 });
196 }
197
198 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
200 let mut expired_keys = Vec::new();
202 let mut ready_key = None;
203
204 for (seq, item) in &self.pending {
205 if now.saturating_duration_since(item.created_at) > self.request_timeout {
207 expired_keys.push(*seq);
208 continue;
209 }
210
211 if item.attempts == 0
213 || item
214 .last_attempt
215 .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
216 {
217 ready_key = Some(*seq);
218 break;
219 }
220 }
221
222 for key in expired_keys {
224 if let Some(expired) = self.pending.shift_remove(&key) {
225 debug!(
226 "Relay request for peer {:?} timed out after {:?}",
227 expired.target_peer_id,
228 now.saturating_duration_since(expired.created_at)
229 );
230 }
231 }
232
233 if let Some(key) = ready_key {
235 if let Some(mut item) = self.pending.shift_remove(&key) {
236 item.attempts += 1;
237 item.last_attempt = Some(now);
238 return Some(item);
239 }
240 }
241
242 None
243 }
244
245 fn requeue_failed(&mut self, item: RelayQueueItem) {
247 if item.attempts < self.max_retries {
248 trace!(
249 "Requeuing failed relay request for peer {:?}, attempt {}/{}",
250 item.target_peer_id, item.attempts, self.max_retries
251 );
252 let seq = self.next_seq;
253 self.next_seq += 1;
254 self.pending.insert(seq, item);
255 } else {
256 debug!(
257 "Dropping relay request for peer {:?} after {} failed attempts",
258 item.target_peer_id, item.attempts
259 );
260 }
261 }
262
263 fn cleanup_expired(&mut self, now: Instant) -> usize {
265 let initial_len = self.pending.len();
266
267 let expired_keys: Vec<u64> = self
269 .pending
270 .iter()
271 .filter_map(|(seq, item)| {
272 if now.saturating_duration_since(item.created_at) > self.request_timeout {
273 Some(*seq)
274 } else {
275 None
276 }
277 })
278 .collect();
279
280 for key in expired_keys {
282 if let Some(expired) = self.pending.shift_remove(&key) {
283 debug!(
284 "Removing expired relay request for peer {:?}",
285 expired.target_peer_id
286 );
287 }
288 }
289
290 initial_len - self.pending.len()
291 }
292
293 fn len(&self) -> usize {
295 self.pending.len()
296 }
297}
298
299pub struct Endpoint {
304 rng: StdRng,
305 index: ConnectionIndex,
306 connections: Slab<ConnectionMeta>,
307 local_cid_generator: Box<dyn ConnectionIdGenerator>,
308 config: Arc<EndpointConfig>,
309 server_config: Option<Arc<ServerConfig>>,
310 allow_mtud: bool,
312 last_stateless_reset: Option<Instant>,
314 incoming_buffers: Slab<IncomingBuffer>,
316 all_incoming_buffers_total_bytes: u64,
317 peer_connections: HashMap<PeerId, ConnectionHandle>,
319 relay_queue: RelayQueue,
321 relay_stats: RelayStats,
323 address_discovery_enabled: bool,
325 address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
327}
328
329impl Endpoint {
330 pub fn new(
341 config: Arc<EndpointConfig>,
342 server_config: Option<Arc<ServerConfig>>,
343 allow_mtud: bool,
344 rng_seed: Option<[u8; 32]>,
345 ) -> Self {
346 let rng_seed = rng_seed.or(config.rng_seed);
347 Self {
348 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
349 index: ConnectionIndex::default(),
350 connections: Slab::new(),
351 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
352 config,
353 server_config,
354 allow_mtud,
355 last_stateless_reset: None,
356 incoming_buffers: Slab::new(),
357 all_incoming_buffers_total_bytes: 0,
358 peer_connections: HashMap::new(),
359 relay_queue: RelayQueue::new(),
360 relay_stats: RelayStats::default(),
361 address_discovery_enabled: true, address_change_callback: None,
363 }
364 }
365
366 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
368 self.server_config = server_config;
369 }
370
371 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
373 self.peer_connections.insert(peer_id, connection_handle);
374 trace!(
375 "Registered peer {:?} with connection {:?}",
376 peer_id, connection_handle
377 );
378 }
379
380 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
382 if let Some(handle) = self.peer_connections.remove(peer_id) {
383 trace!(
384 "Unregistered peer {:?} from connection {:?}",
385 peer_id, handle
386 );
387 }
388 }
389
390 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
392 self.peer_connections.get(peer_id).copied()
393 }
394
395 pub(crate) fn queue_frame_for_peer(
397 &mut self,
398 peer_id: &PeerId,
399 frame: frame::PunchMeNow,
400 ) -> bool {
401 self.relay_stats.requests_received += 1;
402
403 if let Some(ch) = self.lookup_peer_connection(peer_id) {
404 if self.relay_frame_to_connection(ch, frame.clone()) {
406 self.relay_stats.requests_relayed += 1;
407 trace!(
408 "Immediately relayed frame to peer {:?} via connection {:?}",
409 peer_id, ch
410 );
411 return true;
412 }
413 }
414
415 let now = Instant::now();
417 if self.relay_queue.enqueue(*peer_id, frame, now) {
418 self.relay_stats.current_queue_size = self.relay_queue.len();
419 trace!("Queued relay request for peer {:?}", peer_id);
420 true
421 } else {
422 if !self.relay_queue.check_rate_limit(*peer_id, now) {
424 self.relay_stats.requests_rate_limited += 1;
425 } else {
426 self.relay_stats.requests_dropped += 1;
427 }
428 false
429 }
430 }
431
432 fn relay_frame_to_connection(
434 &mut self,
435 ch: ConnectionHandle,
436 frame: frame::PunchMeNow,
437 ) -> bool {
438 let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
440 if let Some(_conn) = self.connections.get_mut(ch.0) {
441 }
448 true
452 }
453
454 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
456 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
457 connection.peer_id = Some(peer_id);
458 self.register_peer(peer_id, connection_handle);
459
460 self.process_queued_relays_for_peer(peer_id);
462 }
463 }
464
465 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
467 let _now = Instant::now();
468 let mut processed = 0;
469
470 let mut items_to_process = Vec::new();
472 let mut keys_to_remove = Vec::new();
473
474 for (seq, item) in &self.relay_queue.pending {
476 if item.target_peer_id == peer_id {
477 items_to_process.push(item.clone());
478 keys_to_remove.push(*seq);
479 }
480 }
481
482 for key in keys_to_remove {
484 self.relay_queue.pending.shift_remove(&key);
485 }
486
487 for item in items_to_process {
489 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
490 if self.relay_frame_to_connection(ch, item.frame.clone()) {
491 self.relay_stats.requests_relayed += 1;
492 processed += 1;
493 trace!("Processed queued relay for peer {:?}", peer_id);
494 } else {
495 self.relay_queue.requeue_failed(item);
497 self.relay_stats.requests_failed += 1;
498 }
499 }
500 }
501
502 self.relay_stats.current_queue_size = self.relay_queue.len();
503
504 if processed > 0 {
505 debug!(
506 "Processed {} queued relay requests for peer {:?}",
507 processed, peer_id
508 );
509 }
510 }
511
512 pub fn process_relay_queue(&mut self) {
514 let now = Instant::now();
515 let mut processed = 0;
516 let mut failed = 0;
517
518 while let Some(item) = self.relay_queue.next_ready(now) {
520 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
521 if self.relay_frame_to_connection(ch, item.frame.clone()) {
522 self.relay_stats.requests_relayed += 1;
523 processed += 1;
524 trace!(
525 "Successfully relayed frame to peer {:?}",
526 item.target_peer_id
527 );
528 } else {
529 self.relay_queue.requeue_failed(item);
531 self.relay_stats.requests_failed += 1;
532 failed += 1;
533 }
534 } else {
535 self.relay_queue.requeue_failed(item);
537 failed += 1;
538 }
539 }
540
541 let expired = self.relay_queue.cleanup_expired(now);
543 if expired > 0 {
544 self.relay_stats.requests_timed_out += expired as u64;
545 debug!("Cleaned up {} expired relay requests", expired);
546 }
547
548 self.relay_stats.current_queue_size = self.relay_queue.len();
549
550 if processed > 0 || failed > 0 {
551 trace!(
552 "Relay queue processing: {} processed, {} failed, {} in queue",
553 processed,
554 failed,
555 self.relay_queue.len()
556 );
557 }
558 }
559
560 pub fn relay_stats(&self) -> &RelayStats {
562 &self.relay_stats
563 }
564
565 pub fn relay_queue_len(&self) -> usize {
567 self.relay_queue.len()
568 }
569
570 pub fn handle_event(
574 &mut self,
575 ch: ConnectionHandle,
576 event: EndpointEvent,
577 ) -> Option<ConnectionEvent> {
578 use EndpointEventInner::*;
579 match event.0 {
580 EndpointEventInner::NeedIdentifiers(now, n) => {
581 return Some(self.send_new_identifiers(now, ch, n));
582 }
583 ResetToken(remote, token) => {
584 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
585 self.index.connection_reset_tokens.remove(old.0, old.1);
586 }
587 if self.index.connection_reset_tokens.insert(remote, token, ch) {
588 warn!("duplicate reset token");
589 }
590 }
591 RetireConnectionId(now, seq, allow_more_cids) => {
592 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
593 trace!("peer retired CID {}: {}", seq, cid);
594 self.index.retire(cid);
595 if allow_more_cids {
596 return Some(self.send_new_identifiers(now, ch, 1));
597 }
598 }
599 }
600 RelayPunchMeNow(target_peer_id, punch_me_now) => {
601 let peer_id = PeerId(target_peer_id);
603 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
604 trace!(
605 "Successfully queued PunchMeNow frame for relay to peer {:?}",
606 peer_id
607 );
608 } else {
609 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
610 }
611 }
612 SendAddressFrame(add_address_frame) => {
613 return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
615 add_address_frame,
616 )));
617 }
618 NatCandidateValidated { address, challenge } => {
619 trace!(
621 "NAT candidate validation succeeded for {} with challenge {:016x}",
622 address, challenge
623 );
624
625 debug!("NAT candidate {} validated successfully", address);
629 }
630 Drained => {
631 if let Some(conn) = self.connections.try_remove(ch.0) {
632 self.index.remove(&conn);
633 if let Some(peer_id) = conn.peer_id {
635 self.peer_connections.remove(&peer_id);
636 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
637 }
638 } else {
639 error!(id = ch.0, "unknown connection drained");
643 }
644 }
645 }
646 None
647 }
648
649 pub fn handle(
651 &mut self,
652 now: Instant,
653 remote: SocketAddr,
654 local_ip: Option<IpAddr>,
655 ecn: Option<EcnCodepoint>,
656 data: BytesMut,
657 buf: &mut Vec<u8>,
658 ) -> Option<DatagramEvent> {
659 let datagram_len = data.len();
661 let event = match PartialDecode::new(
662 data,
663 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
664 &self.config.supported_versions,
665 self.config.grease_quic_bit,
666 ) {
667 Ok((first_decode, remaining)) => DatagramConnectionEvent {
668 now,
669 remote,
670 ecn,
671 first_decode,
672 remaining,
673 },
674 Err(PacketDecodeError::UnsupportedVersion {
675 src_cid,
676 dst_cid,
677 version,
678 }) => {
679 if self.server_config.is_none() {
680 debug!("dropping packet with unsupported version");
681 return None;
682 }
683 trace!("sending version negotiation");
684 Header::VersionNegotiate {
686 random: self.rng.r#gen::<u8>() | 0x40,
687 src_cid: dst_cid,
688 dst_cid: src_cid,
689 }
690 .encode(buf);
691 buf.write::<u32>(match version {
693 0x0a1a_2a3a => 0x0a1a_2a4a,
694 _ => 0x0a1a_2a3a,
695 });
696 for &version in &self.config.supported_versions {
697 buf.write(version);
698 }
699 return Some(DatagramEvent::Response(Transmit {
700 destination: remote,
701 ecn: None,
702 size: buf.len(),
703 segment_size: None,
704 src_ip: local_ip,
705 }));
706 }
707 Err(e) => {
708 trace!("malformed header: {}", e);
709 return None;
710 }
711 };
712
713 let addresses = FourTuple { remote, local_ip };
714 let dst_cid = event.first_decode.dst_cid();
715
716 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
717 match route_to {
719 RouteDatagramTo::Incoming(incoming_idx) => {
720 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
721 let config = &self.server_config.as_ref().unwrap();
722
723 if incoming_buffer
724 .total_bytes
725 .checked_add(datagram_len as u64)
726 .is_some_and(|n| n <= config.incoming_buffer_size)
727 && self
728 .all_incoming_buffers_total_bytes
729 .checked_add(datagram_len as u64)
730 .is_some_and(|n| n <= config.incoming_buffer_size_total)
731 {
732 incoming_buffer.datagrams.push(event);
733 incoming_buffer.total_bytes += datagram_len as u64;
734 self.all_incoming_buffers_total_bytes += datagram_len as u64;
735 }
736
737 None
738 }
739 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
740 ch,
741 ConnectionEvent(ConnectionEventInner::Datagram(event)),
742 )),
743 }
744 } else if event.first_decode.initial_header().is_some() {
745 self.handle_first_packet(datagram_len, event, addresses, buf)
748 } else if event.first_decode.has_long_header() {
749 debug!(
750 "ignoring non-initial packet for unknown connection {}",
751 dst_cid
752 );
753 None
754 } else if !event.first_decode.is_initial()
755 && self.local_cid_generator.validate(dst_cid).is_err()
756 {
757 debug!("dropping packet with invalid CID");
761 None
762 } else if dst_cid.is_empty() {
763 trace!("dropping unrecognized short packet without ID");
764 None
765 } else {
766 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
767 .map(DatagramEvent::Response)
768 }
769 }
770
771 fn stateless_reset(
772 &mut self,
773 now: Instant,
774 inciting_dgram_len: usize,
775 addresses: FourTuple,
776 dst_cid: ConnectionId,
777 buf: &mut Vec<u8>,
778 ) -> Option<Transmit> {
779 if self
780 .last_stateless_reset
781 .is_some_and(|last| last + self.config.min_reset_interval > now)
782 {
783 debug!("ignoring unexpected packet within minimum stateless reset interval");
784 return None;
785 }
786
787 const MIN_PADDING_LEN: usize = 5;
789
790 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
793 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
794 _ => {
795 debug!(
796 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
797 inciting_dgram_len
798 );
799 return None;
800 }
801 };
802
803 debug!(
804 "sending stateless reset for {} to {}",
805 dst_cid, addresses.remote
806 );
807 self.last_stateless_reset = Some(now);
808 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
810 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
811 max_padding_len
812 } else {
813 self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
814 };
815 buf.reserve(padding_len + RESET_TOKEN_SIZE);
816 buf.resize(padding_len, 0);
817 self.rng.fill_bytes(&mut buf[0..padding_len]);
818 buf[0] = 0b0100_0000 | (buf[0] >> 2);
819 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
820
821 debug_assert!(buf.len() < inciting_dgram_len);
822
823 Some(Transmit {
824 destination: addresses.remote,
825 ecn: None,
826 size: buf.len(),
827 segment_size: None,
828 src_ip: addresses.local_ip,
829 })
830 }
831
832 pub fn connect(
834 &mut self,
835 now: Instant,
836 config: ClientConfig,
837 remote: SocketAddr,
838 server_name: &str,
839 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
840 if self.cids_exhausted() {
841 return Err(ConnectError::CidsExhausted);
842 }
843 if remote.port() == 0 || remote.ip().is_unspecified() {
844 return Err(ConnectError::InvalidRemoteAddress(remote));
845 }
846 if !self.config.supported_versions.contains(&config.version) {
847 return Err(ConnectError::UnsupportedVersion);
848 }
849
850 let remote_id = (config.initial_dst_cid_provider)();
851 trace!(initial_dcid = %remote_id);
852
853 let ch = ConnectionHandle(self.connections.vacant_key());
854 let loc_cid = self.new_cid(ch);
855 let params = TransportParameters::new(
856 &config.transport,
857 &self.config,
858 self.local_cid_generator.as_ref(),
859 loc_cid,
860 None,
861 &mut self.rng,
862 );
863 let tls = config
864 .crypto
865 .start_session(config.version, server_name, ¶ms)?;
866
867 let conn = self.add_connection(
868 ch,
869 config.version,
870 remote_id,
871 loc_cid,
872 remote_id,
873 FourTuple {
874 remote,
875 local_ip: None,
876 },
877 now,
878 tls,
879 config.transport,
880 SideArgs::Client {
881 token_store: config.token_store,
882 server_name: server_name.into(),
883 },
884 );
885 Ok((ch, conn))
886 }
887
888 fn send_new_identifiers(
889 &mut self,
890 now: Instant,
891 ch: ConnectionHandle,
892 num: u64,
893 ) -> ConnectionEvent {
894 let mut ids = vec![];
895 for _ in 0..num {
896 let id = self.new_cid(ch);
897 let meta = &mut self.connections[ch];
898 let sequence = meta.cids_issued;
899 meta.cids_issued += 1;
900 meta.loc_cids.insert(sequence, id);
901 ids.push(IssuedCid {
902 sequence,
903 id,
904 reset_token: ResetToken::new(&*self.config.reset_key, id),
905 });
906 }
907 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
908 }
909
910 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
912 loop {
913 let cid = self.local_cid_generator.generate_cid();
914 if cid.is_empty() {
915 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
917 return cid;
918 }
919 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
920 e.insert(ch);
921 break cid;
922 }
923 }
924 }
925
926 fn handle_first_packet(
927 &mut self,
928 datagram_len: usize,
929 event: DatagramConnectionEvent,
930 addresses: FourTuple,
931 buf: &mut Vec<u8>,
932 ) -> Option<DatagramEvent> {
933 let dst_cid = event.first_decode.dst_cid();
934 let header = event.first_decode.initial_header().unwrap();
935
936 let Some(server_config) = &self.server_config else {
937 debug!("packet for unrecognized connection {}", dst_cid);
938 return self
939 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
940 .map(DatagramEvent::Response);
941 };
942
943 if datagram_len < MIN_INITIAL_SIZE as usize {
944 debug!("ignoring short initial for connection {}", dst_cid);
945 return None;
946 }
947
948 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
949 Ok(keys) => keys,
950 Err(UnsupportedVersion) => {
951 debug!(
954 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
955 header.version
956 );
957 return None;
958 }
959 };
960
961 if let Err(reason) = self.early_validate_first_packet(header) {
962 return Some(DatagramEvent::Response(self.initial_close(
963 header.version,
964 addresses,
965 &crypto,
966 &header.src_cid,
967 reason,
968 buf,
969 )));
970 }
971
972 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
973 Ok(packet) => packet,
974 Err(e) => {
975 trace!("unable to decode initial packet: {}", e);
976 return None;
977 }
978 };
979
980 if !packet.reserved_bits_valid() {
981 debug!("dropping connection attempt with invalid reserved bits");
982 return None;
983 }
984
985 let Header::Initial(header) = packet.header else {
986 panic!("non-initial packet in handle_first_packet()");
987 };
988
989 let server_config = self.server_config.as_ref().unwrap().clone();
990
991 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
992 Ok(token) => token,
993 Err(InvalidRetryTokenError) => {
994 debug!("rejecting invalid retry token");
995 return Some(DatagramEvent::Response(self.initial_close(
996 header.version,
997 addresses,
998 &crypto,
999 &header.src_cid,
1000 TransportError::INVALID_TOKEN(""),
1001 buf,
1002 )));
1003 }
1004 };
1005
1006 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1007 self.index
1008 .insert_initial_incoming(header.dst_cid, incoming_idx);
1009
1010 Some(DatagramEvent::NewConnection(Incoming {
1011 received_at: event.now,
1012 addresses,
1013 ecn: event.ecn,
1014 packet: InitialPacket {
1015 header,
1016 header_data: packet.header_data,
1017 payload: packet.payload,
1018 },
1019 rest: event.remaining,
1020 crypto,
1021 token,
1022 incoming_idx,
1023 improper_drop_warner: IncomingImproperDropWarner,
1024 }))
1025 }
1026
1027 #[allow(clippy::result_large_err)]
1030 pub fn accept(
1031 &mut self,
1032 mut incoming: Incoming,
1033 now: Instant,
1034 buf: &mut Vec<u8>,
1035 server_config: Option<Arc<ServerConfig>>,
1036 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1037 let remote_address_validated = incoming.remote_address_validated();
1038 incoming.improper_drop_warner.dismiss();
1039 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1040 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1041
1042 let packet_number = incoming.packet.header.number.expand(0);
1043 let InitialHeader {
1044 src_cid,
1045 dst_cid,
1046 version,
1047 ..
1048 } = incoming.packet.header;
1049 let server_config =
1050 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1051
1052 if server_config
1053 .transport
1054 .max_idle_timeout
1055 .is_some_and(|timeout| {
1056 incoming.received_at + Duration::from_millis(timeout.into()) <= now
1057 })
1058 {
1059 debug!("abandoning accept of stale initial");
1060 self.index.remove_initial(dst_cid);
1061 return Err(AcceptError {
1062 cause: ConnectionError::TimedOut,
1063 response: None,
1064 });
1065 }
1066
1067 if self.cids_exhausted() {
1068 debug!("refusing connection");
1069 self.index.remove_initial(dst_cid);
1070 return Err(AcceptError {
1071 cause: ConnectionError::CidsExhausted,
1072 response: Some(self.initial_close(
1073 version,
1074 incoming.addresses,
1075 &incoming.crypto,
1076 &src_cid,
1077 TransportError::CONNECTION_REFUSED(""),
1078 buf,
1079 )),
1080 });
1081 }
1082
1083 if incoming
1084 .crypto
1085 .packet
1086 .remote
1087 .decrypt(
1088 packet_number,
1089 &incoming.packet.header_data,
1090 &mut incoming.packet.payload,
1091 )
1092 .is_err()
1093 {
1094 debug!(packet_number, "failed to authenticate initial packet");
1095 self.index.remove_initial(dst_cid);
1096 return Err(AcceptError {
1097 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1098 response: None,
1099 });
1100 };
1101
1102 let ch = ConnectionHandle(self.connections.vacant_key());
1103 let loc_cid = self.new_cid(ch);
1104 let mut params = TransportParameters::new(
1105 &server_config.transport,
1106 &self.config,
1107 self.local_cid_generator.as_ref(),
1108 loc_cid,
1109 Some(&server_config),
1110 &mut self.rng,
1111 );
1112 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1113 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1114 params.retry_src_cid = incoming.token.retry_src_cid;
1115 let mut pref_addr_cid = None;
1116 if server_config.has_preferred_address() {
1117 let cid = self.new_cid(ch);
1118 pref_addr_cid = Some(cid);
1119 params.preferred_address = Some(PreferredAddress {
1120 address_v4: server_config.preferred_address_v4,
1121 address_v6: server_config.preferred_address_v6,
1122 connection_id: cid,
1123 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1124 });
1125 }
1126
1127 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1128 let transport_config = server_config.transport.clone();
1129 let mut conn = self.add_connection(
1130 ch,
1131 version,
1132 dst_cid,
1133 loc_cid,
1134 src_cid,
1135 incoming.addresses,
1136 incoming.received_at,
1137 tls,
1138 transport_config,
1139 SideArgs::Server {
1140 server_config,
1141 pref_addr_cid,
1142 path_validated: remote_address_validated,
1143 },
1144 );
1145 self.index.insert_initial(dst_cid, ch);
1146
1147 match conn.handle_first_packet(
1148 incoming.received_at,
1149 incoming.addresses.remote,
1150 incoming.ecn,
1151 packet_number,
1152 incoming.packet,
1153 incoming.rest,
1154 ) {
1155 Ok(()) => {
1156 trace!(id = ch.0, icid = %dst_cid, "new connection");
1157
1158 for event in incoming_buffer.datagrams {
1159 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1160 }
1161
1162 Ok((ch, conn))
1163 }
1164 Err(e) => {
1165 debug!("handshake failed: {}", e);
1166 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1167 let response = match e {
1168 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1169 version,
1170 incoming.addresses,
1171 &incoming.crypto,
1172 &src_cid,
1173 e.clone(),
1174 buf,
1175 )),
1176 _ => None,
1177 };
1178 Err(AcceptError { cause: e, response })
1179 }
1180 }
1181 }
1182
1183 fn early_validate_first_packet(
1185 &mut self,
1186 header: &ProtectedInitialHeader,
1187 ) -> Result<(), TransportError> {
1188 let config = &self.server_config.as_ref().unwrap();
1189 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1190 return Err(TransportError::CONNECTION_REFUSED(""));
1191 }
1192
1193 if header.dst_cid.len() < 8
1198 && (header.token_pos.is_empty()
1199 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1200 {
1201 debug!(
1202 "rejecting connection due to invalid DCID length {}",
1203 header.dst_cid.len()
1204 );
1205 return Err(TransportError::PROTOCOL_VIOLATION(
1206 "invalid destination CID length",
1207 ));
1208 }
1209
1210 Ok(())
1211 }
1212
1213 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1215 self.clean_up_incoming(&incoming);
1216 incoming.improper_drop_warner.dismiss();
1217
1218 self.initial_close(
1219 incoming.packet.header.version,
1220 incoming.addresses,
1221 &incoming.crypto,
1222 &incoming.packet.header.src_cid,
1223 TransportError::CONNECTION_REFUSED(""),
1224 buf,
1225 )
1226 }
1227
1228 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1232 if !incoming.may_retry() {
1233 return Err(RetryError(Box::new(incoming)));
1234 }
1235
1236 self.clean_up_incoming(&incoming);
1237 incoming.improper_drop_warner.dismiss();
1238
1239 let server_config = self.server_config.as_ref().unwrap();
1240
1241 let loc_cid = self.local_cid_generator.generate_cid();
1248
1249 let payload = TokenPayload::Retry {
1250 address: incoming.addresses.remote,
1251 orig_dst_cid: incoming.packet.header.dst_cid,
1252 issued: server_config.time_source.now(),
1253 };
1254 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1255
1256 let header = Header::Retry {
1257 src_cid: loc_cid,
1258 dst_cid: incoming.packet.header.src_cid,
1259 version: incoming.packet.header.version,
1260 };
1261
1262 let encode = header.encode(buf);
1263 buf.put_slice(&token);
1264 buf.extend_from_slice(&server_config.crypto.retry_tag(
1265 incoming.packet.header.version,
1266 &incoming.packet.header.dst_cid,
1267 buf,
1268 ));
1269 encode.finish(buf, &*incoming.crypto.header.local, None);
1270
1271 Ok(Transmit {
1272 destination: incoming.addresses.remote,
1273 ecn: None,
1274 size: buf.len(),
1275 segment_size: None,
1276 src_ip: incoming.addresses.local_ip,
1277 })
1278 }
1279
1280 pub fn ignore(&mut self, incoming: Incoming) {
1285 self.clean_up_incoming(&incoming);
1286 incoming.improper_drop_warner.dismiss();
1287 }
1288
1289 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1291 self.index.remove_initial(incoming.packet.header.dst_cid);
1292 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1293 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1294 }
1295
1296 fn add_connection(
1297 &mut self,
1298 ch: ConnectionHandle,
1299 version: u32,
1300 init_cid: ConnectionId,
1301 loc_cid: ConnectionId,
1302 rem_cid: ConnectionId,
1303 addresses: FourTuple,
1304 now: Instant,
1305 tls: Box<dyn crypto::Session>,
1306 transport_config: Arc<TransportConfig>,
1307 side_args: SideArgs,
1308 ) -> Connection {
1309 let mut rng_seed = [0; 32];
1310 self.rng.fill_bytes(&mut rng_seed);
1311 let side = side_args.side();
1312 let pref_addr_cid = side_args.pref_addr_cid();
1313 let conn = Connection::new(
1314 self.config.clone(),
1315 transport_config,
1316 init_cid,
1317 loc_cid,
1318 rem_cid,
1319 addresses.remote,
1320 addresses.local_ip,
1321 tls,
1322 self.local_cid_generator.as_ref(),
1323 now,
1324 version,
1325 self.allow_mtud,
1326 rng_seed,
1327 side_args,
1328 );
1329
1330 let mut cids_issued = 0;
1331 let mut loc_cids = FxHashMap::default();
1332
1333 loc_cids.insert(cids_issued, loc_cid);
1334 cids_issued += 1;
1335
1336 if let Some(cid) = pref_addr_cid {
1337 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1338 loc_cids.insert(cids_issued, cid);
1339 cids_issued += 1;
1340 }
1341
1342 let id = self.connections.insert(ConnectionMeta {
1343 init_cid,
1344 cids_issued,
1345 loc_cids,
1346 addresses,
1347 side,
1348 reset_token: None,
1349 peer_id: None,
1350 });
1351 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1352
1353 self.index.insert_conn(addresses, loc_cid, ch, side);
1354
1355 conn
1356 }
1357
1358 fn initial_close(
1359 &mut self,
1360 version: u32,
1361 addresses: FourTuple,
1362 crypto: &Keys,
1363 remote_id: &ConnectionId,
1364 reason: TransportError,
1365 buf: &mut Vec<u8>,
1366 ) -> Transmit {
1367 let local_id = self.local_cid_generator.generate_cid();
1371 let number = PacketNumber::U8(0);
1372 let header = Header::Initial(InitialHeader {
1373 dst_cid: *remote_id,
1374 src_cid: local_id,
1375 number,
1376 token: Bytes::new(),
1377 version,
1378 });
1379
1380 let partial_encode = header.encode(buf);
1381 let max_len =
1382 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1383 frame::Close::from(reason).encode(buf, max_len);
1384 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1385 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1386 Transmit {
1387 destination: addresses.remote,
1388 ecn: None,
1389 size: buf.len(),
1390 segment_size: None,
1391 src_ip: addresses.local_ip,
1392 }
1393 }
1394
1395 pub fn config(&self) -> &EndpointConfig {
1397 &self.config
1398 }
1399
1400 pub fn enable_address_discovery(&mut self, enabled: bool) {
1407 self.address_discovery_enabled = enabled;
1408 }
1411
1412 pub fn address_discovery_enabled(&self) -> bool {
1414 self.address_discovery_enabled
1415 }
1416
1417 pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1425 Vec::new()
1427 }
1428
1429 pub fn set_address_change_callback<F>(&mut self, callback: F)
1434 where
1435 F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1436 {
1437 self.address_change_callback = Some(Box::new(callback));
1438 }
1439
1440 pub fn clear_address_change_callback(&mut self) {
1442 self.address_change_callback = None;
1443 }
1444
1445 pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1450 AddressDiscoveryStats::default()
1452 }
1453
1454 pub fn open_connections(&self) -> usize {
1456 self.connections.len()
1457 }
1458
1459 pub fn incoming_buffer_bytes(&self) -> u64 {
1462 self.all_incoming_buffers_total_bytes
1463 }
1464
1465 #[cfg(test)]
1466 pub(crate) fn known_connections(&self) -> usize {
1467 let x = self.connections.len();
1468 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1469 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1471 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1473 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1474 x
1475 }
1476
1477 #[cfg(test)]
1478 pub(crate) fn known_cids(&self) -> usize {
1479 self.index.connection_ids.len()
1480 }
1481
1482 fn cids_exhausted(&self) -> bool {
1487 self.local_cid_generator.cid_len() <= 4
1488 && self.local_cid_generator.cid_len() != 0
1489 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1490 - self.index.connection_ids.len())
1491 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1492 }
1493}
1494
1495impl fmt::Debug for Endpoint {
1496 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1497 fmt.debug_struct("Endpoint")
1498 .field("rng", &self.rng)
1499 .field("index", &self.index)
1500 .field("connections", &self.connections)
1501 .field("config", &self.config)
1502 .field("server_config", &self.server_config)
1503 .field("incoming_buffers.len", &self.incoming_buffers.len())
1505 .field(
1506 "all_incoming_buffers_total_bytes",
1507 &self.all_incoming_buffers_total_bytes,
1508 )
1509 .finish()
1510 }
1511}
1512
1513#[derive(Default)]
1515struct IncomingBuffer {
1516 datagrams: Vec<DatagramConnectionEvent>,
1517 total_bytes: u64,
1518}
1519
1520#[derive(Copy, Clone, Debug)]
1522enum RouteDatagramTo {
1523 Incoming(usize),
1524 Connection(ConnectionHandle),
1525}
1526
1527#[derive(Default, Debug)]
1529struct ConnectionIndex {
1530 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1536 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1540 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1544 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1553 connection_reset_tokens: ResetTokenTable,
1558}
1559
1560impl ConnectionIndex {
1561 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1563 if dst_cid.is_empty() {
1564 return;
1565 }
1566 self.connection_ids_initial
1567 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1568 }
1569
1570 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1572 if dst_cid.is_empty() {
1573 return;
1574 }
1575 let removed = self.connection_ids_initial.remove(&dst_cid);
1576 debug_assert!(removed.is_some());
1577 }
1578
1579 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1581 if dst_cid.is_empty() {
1582 return;
1583 }
1584 self.connection_ids_initial
1585 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1586 }
1587
1588 fn insert_conn(
1591 &mut self,
1592 addresses: FourTuple,
1593 dst_cid: ConnectionId,
1594 connection: ConnectionHandle,
1595 side: Side,
1596 ) {
1597 match dst_cid.len() {
1598 0 => match side {
1599 Side::Server => {
1600 self.incoming_connection_remotes
1601 .insert(addresses, connection);
1602 }
1603 Side::Client => {
1604 self.outgoing_connection_remotes
1605 .insert(addresses.remote, connection);
1606 }
1607 },
1608 _ => {
1609 self.connection_ids.insert(dst_cid, connection);
1610 }
1611 }
1612 }
1613
1614 fn retire(&mut self, dst_cid: ConnectionId) {
1616 self.connection_ids.remove(&dst_cid);
1617 }
1618
1619 fn remove(&mut self, conn: &ConnectionMeta) {
1621 if conn.side.is_server() {
1622 self.remove_initial(conn.init_cid);
1623 }
1624 for cid in conn.loc_cids.values() {
1625 self.connection_ids.remove(cid);
1626 }
1627 self.incoming_connection_remotes.remove(&conn.addresses);
1628 self.outgoing_connection_remotes
1629 .remove(&conn.addresses.remote);
1630 if let Some((remote, token)) = conn.reset_token {
1631 self.connection_reset_tokens.remove(remote, token);
1632 }
1633 }
1634
1635 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1637 let dst_cid = datagram.dst_cid();
1638 let is_empty_cid = dst_cid.is_empty();
1639
1640 if !is_empty_cid {
1642 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1643 return Some(RouteDatagramTo::Connection(ch));
1644 }
1645 }
1646
1647 if datagram.is_initial() || datagram.is_0rtt() {
1649 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1650 return Some(ch);
1651 }
1652 }
1653
1654 if is_empty_cid {
1656 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1658 return Some(RouteDatagramTo::Connection(ch));
1659 }
1660 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1661 return Some(RouteDatagramTo::Connection(ch));
1662 }
1663 }
1664
1665 let data = datagram.data();
1667 if data.len() < RESET_TOKEN_SIZE {
1668 return None;
1669 }
1670 self.connection_reset_tokens
1671 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1672 .cloned()
1673 .map(RouteDatagramTo::Connection)
1674 }
1675}
1676
1677#[derive(Debug)]
1678pub(crate) struct ConnectionMeta {
1679 init_cid: ConnectionId,
1680 cids_issued: u64,
1682 loc_cids: FxHashMap<u64, ConnectionId>,
1683 addresses: FourTuple,
1688 side: Side,
1689 reset_token: Option<(SocketAddr, ResetToken)>,
1692 peer_id: Option<PeerId>,
1694}
1695
1696#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1698pub struct ConnectionHandle(pub usize);
1699
1700impl From<ConnectionHandle> for usize {
1701 fn from(x: ConnectionHandle) -> Self {
1702 x.0
1703 }
1704}
1705
1706impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1707 type Output = ConnectionMeta;
1708 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1709 &self[ch.0]
1710 }
1711}
1712
1713impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1714 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1715 &mut self[ch.0]
1716 }
1717}
1718
1719pub enum DatagramEvent {
1721 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1723 NewConnection(Incoming),
1725 Response(Transmit),
1727}
1728
1729pub struct Incoming {
1731 received_at: Instant,
1732 addresses: FourTuple,
1733 ecn: Option<EcnCodepoint>,
1734 packet: InitialPacket,
1735 rest: Option<BytesMut>,
1736 crypto: Keys,
1737 token: IncomingToken,
1738 incoming_idx: usize,
1739 improper_drop_warner: IncomingImproperDropWarner,
1740}
1741
1742impl Incoming {
1743 pub fn local_ip(&self) -> Option<IpAddr> {
1747 self.addresses.local_ip
1748 }
1749
1750 pub fn remote_address(&self) -> SocketAddr {
1752 self.addresses.remote
1753 }
1754
1755 pub fn remote_address_validated(&self) -> bool {
1763 self.token.validated
1764 }
1765
1766 pub fn may_retry(&self) -> bool {
1771 self.token.retry_src_cid.is_none()
1772 }
1773
1774 pub fn orig_dst_cid(&self) -> &ConnectionId {
1776 &self.token.orig_dst_cid
1777 }
1778}
1779
1780impl fmt::Debug for Incoming {
1781 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1782 f.debug_struct("Incoming")
1783 .field("addresses", &self.addresses)
1784 .field("ecn", &self.ecn)
1785 .field("token", &self.token)
1788 .field("incoming_idx", &self.incoming_idx)
1789 .finish_non_exhaustive()
1791 }
1792}
1793
1794struct IncomingImproperDropWarner;
1795
1796impl IncomingImproperDropWarner {
1797 fn dismiss(self) {
1798 mem::forget(self);
1799 }
1800}
1801
1802impl Drop for IncomingImproperDropWarner {
1803 fn drop(&mut self) {
1804 warn!(
1805 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1806 (may cause memory leak and eventual inability to accept new connections)"
1807 );
1808 }
1809}
1810
1811#[derive(Debug, Error, Clone, PartialEq, Eq)]
1815pub enum ConnectError {
1816 #[error("endpoint stopping")]
1820 EndpointStopping,
1821 #[error("CIDs exhausted")]
1825 CidsExhausted,
1826 #[error("invalid server name: {0}")]
1828 InvalidServerName(String),
1829 #[error("invalid remote address: {0}")]
1833 InvalidRemoteAddress(SocketAddr),
1834 #[error("no default client config")]
1838 NoDefaultClientConfig,
1839 #[error("unsupported QUIC version")]
1841 UnsupportedVersion,
1842}
1843
1844#[derive(Debug)]
1846pub struct AcceptError {
1847 pub cause: ConnectionError,
1849 pub response: Option<Transmit>,
1851}
1852
1853#[derive(Debug, Error)]
1855#[error("retry() with validated Incoming")]
1856pub struct RetryError(Box<Incoming>);
1857
1858impl RetryError {
1859 pub fn into_incoming(self) -> Incoming {
1861 *self.0
1862 }
1863}
1864
1865#[derive(Default, Debug)]
1870struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1871
1872impl ResetTokenTable {
1873 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1874 self.0
1875 .entry(remote)
1876 .or_default()
1877 .insert(token, ch)
1878 .is_some()
1879 }
1880
1881 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1882 use std::collections::hash_map::Entry;
1883 match self.0.entry(remote) {
1884 Entry::Vacant(_) => {}
1885 Entry::Occupied(mut e) => {
1886 e.get_mut().remove(&token);
1887 if e.get().is_empty() {
1888 e.remove_entry();
1889 }
1890 }
1891 }
1892 }
1893
1894 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1895 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1896 self.0.get(&remote)?.get(&token)
1897 }
1898}
1899
1900#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1905struct FourTuple {
1906 remote: SocketAddr,
1907 local_ip: Option<IpAddr>,
1909}