1use std::{
2 collections::{HashMap, VecDeque, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21 Side, Transmit, TransportConfig, TransportError,
22 cid_generator::ConnectionIdGenerator,
23 coding::BufMutExt,
24 config::{ClientConfig, EndpointConfig, ServerConfig},
25 connection::{Connection, ConnectionError, SideArgs},
26 crypto::{self, Keys, UnsupportedVersion},
27 frame,
28 nat_traversal_api::PeerId,
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31 PacketNumber, PartialDecode, ProtectedInitialHeader,
32 },
33 shared::{
34 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35 EndpointEvent, EndpointEventInner, IssuedCid,
36 },
37 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38 transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41#[derive(Debug, Clone)]
43struct RelayQueueItem {
44 target_peer_id: PeerId,
46 frame: frame::PunchMeNow,
48 created_at: Instant,
50 attempts: u32,
52 last_attempt: Option<Instant>,
54}
55
56#[derive(Debug)]
58struct RelayQueue {
59 pending: IndexMap<u64, RelayQueueItem>,
61 next_seq: u64,
63 max_queue_size: usize,
65 request_timeout: Duration,
67 max_retries: u32,
69 retry_interval: Duration,
71 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73 max_relays_per_peer: usize,
75 rate_limit_window: Duration,
77}
78
79#[derive(Debug, Default, Clone)]
81pub struct AddressDiscoveryStats {
82 pub frames_sent: u64,
84 pub frames_received: u64,
86 pub addresses_discovered: u64,
88 pub address_changes_detected: u64,
90}
91
92#[derive(Debug, Default)]
94pub struct RelayStats {
95 requests_received: u64,
97 requests_relayed: u64,
99 requests_failed: u64,
101 requests_dropped: u64,
103 requests_timed_out: u64,
105 requests_rate_limited: u64,
107 current_queue_size: usize,
109}
110
111impl RelayQueue {
112 fn new() -> Self {
114 Self {
115 pending: IndexMap::new(),
116 next_seq: 0,
117 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
120 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
122 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
125 }
126
127 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
129 if self.pending.len() >= self.max_queue_size {
131 warn!(
132 "Relay queue full, dropping request for peer {:?}",
133 target_peer_id
134 );
135 return false;
136 }
137
138 if !self.check_rate_limit(target_peer_id, now) {
140 warn!(
141 "Rate limit exceeded for peer {:?}, dropping relay request",
142 target_peer_id
143 );
144 return false;
145 }
146
147 let item = RelayQueueItem {
148 target_peer_id,
149 frame,
150 created_at: now,
151 attempts: 0,
152 last_attempt: None,
153 };
154
155 let seq = self.next_seq;
156 self.next_seq += 1;
157 self.pending.insert(seq, item);
158
159 self.record_relay_request(target_peer_id, now);
161
162 trace!(
163 "Queued relay request for peer {:?}, queue size: {}",
164 target_peer_id,
165 self.pending.len()
166 );
167 true
168 }
169
170 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
172 self.cleanup_rate_limiter(now);
174
175 if let Some(requests) = self.rate_limiter.get(&peer_id) {
177 requests.len() < self.max_relays_per_peer
178 } else {
179 true }
181 }
182
183 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
185 self.rate_limiter.entry(peer_id).or_default().push_back(now);
186 }
187
188 fn cleanup_rate_limiter(&mut self, now: Instant) {
190 self.rate_limiter.retain(|_, requests| {
191 requests.retain(|&request_time| {
192 now.saturating_duration_since(request_time) <= self.rate_limit_window
193 });
194 !requests.is_empty()
195 });
196 }
197
198 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
200 let mut expired_keys = Vec::new();
202 let mut ready_key = None;
203
204 for (seq, item) in &self.pending {
205 if now.saturating_duration_since(item.created_at) > self.request_timeout {
207 expired_keys.push(*seq);
208 continue;
209 }
210
211 if item.attempts == 0
213 || item
214 .last_attempt
215 .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
216 {
217 ready_key = Some(*seq);
218 break;
219 }
220 }
221
222 for key in expired_keys {
224 if let Some(expired) = self.pending.shift_remove(&key) {
225 debug!(
226 "Relay request for peer {:?} timed out after {:?}",
227 expired.target_peer_id,
228 now.saturating_duration_since(expired.created_at)
229 );
230 }
231 }
232
233 if let Some(key) = ready_key {
235 if let Some(mut item) = self.pending.shift_remove(&key) {
236 item.attempts += 1;
237 item.last_attempt = Some(now);
238 return Some(item);
239 }
240 }
241
242 None
243 }
244
245 fn requeue_failed(&mut self, item: RelayQueueItem) {
247 if item.attempts < self.max_retries {
248 trace!(
249 "Requeuing failed relay request for peer {:?}, attempt {}/{}",
250 item.target_peer_id, item.attempts, self.max_retries
251 );
252 let seq = self.next_seq;
253 self.next_seq += 1;
254 self.pending.insert(seq, item);
255 } else {
256 debug!(
257 "Dropping relay request for peer {:?} after {} failed attempts",
258 item.target_peer_id, item.attempts
259 );
260 }
261 }
262
263 fn cleanup_expired(&mut self, now: Instant) -> usize {
265 let initial_len = self.pending.len();
266
267 let expired_keys: Vec<u64> = self
269 .pending
270 .iter()
271 .filter_map(|(seq, item)| {
272 if now.saturating_duration_since(item.created_at) > self.request_timeout {
273 Some(*seq)
274 } else {
275 None
276 }
277 })
278 .collect();
279
280 for key in expired_keys {
282 if let Some(expired) = self.pending.shift_remove(&key) {
283 debug!(
284 "Removing expired relay request for peer {:?}",
285 expired.target_peer_id
286 );
287 }
288 }
289
290 initial_len - self.pending.len()
291 }
292
293 fn len(&self) -> usize {
295 self.pending.len()
296 }
297}
298
299pub struct Endpoint {
304 rng: StdRng,
305 index: ConnectionIndex,
306 connections: Slab<ConnectionMeta>,
307 local_cid_generator: Box<dyn ConnectionIdGenerator>,
308 config: Arc<EndpointConfig>,
309 server_config: Option<Arc<ServerConfig>>,
310 allow_mtud: bool,
312 last_stateless_reset: Option<Instant>,
314 incoming_buffers: Slab<IncomingBuffer>,
316 all_incoming_buffers_total_bytes: u64,
317 peer_connections: HashMap<PeerId, ConnectionHandle>,
319 relay_queue: RelayQueue,
321 relay_stats: RelayStats,
323 address_discovery_enabled: bool,
325 address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
327}
328
329impl Endpoint {
330 pub fn new(
341 config: Arc<EndpointConfig>,
342 server_config: Option<Arc<ServerConfig>>,
343 allow_mtud: bool,
344 rng_seed: Option<[u8; 32]>,
345 ) -> Self {
346 let rng_seed = rng_seed.or(config.rng_seed);
347 Self {
348 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
349 index: ConnectionIndex::default(),
350 connections: Slab::new(),
351 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
352 config,
353 server_config,
354 allow_mtud,
355 last_stateless_reset: None,
356 incoming_buffers: Slab::new(),
357 all_incoming_buffers_total_bytes: 0,
358 peer_connections: HashMap::new(),
359 relay_queue: RelayQueue::new(),
360 relay_stats: RelayStats::default(),
361 address_discovery_enabled: true, address_change_callback: None,
363 }
364 }
365
366 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
368 self.server_config = server_config;
369 }
370
371 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
373 self.peer_connections.insert(peer_id, connection_handle);
374 trace!(
375 "Registered peer {:?} with connection {:?}",
376 peer_id, connection_handle
377 );
378 }
379
380 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
382 if let Some(handle) = self.peer_connections.remove(peer_id) {
383 trace!(
384 "Unregistered peer {:?} from connection {:?}",
385 peer_id, handle
386 );
387 }
388 }
389
390 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
392 self.peer_connections.get(peer_id).copied()
393 }
394
395 pub(crate) fn queue_frame_for_peer(
397 &mut self,
398 peer_id: &PeerId,
399 frame: frame::PunchMeNow,
400 ) -> bool {
401 self.relay_stats.requests_received += 1;
402
403 if let Some(ch) = self.lookup_peer_connection(peer_id) {
404 if self.relay_frame_to_connection(ch, frame.clone()) {
406 self.relay_stats.requests_relayed += 1;
407 trace!(
408 "Immediately relayed frame to peer {:?} via connection {:?}",
409 peer_id, ch
410 );
411 return true;
412 }
413 }
414
415 let now = Instant::now();
417 if self.relay_queue.enqueue(*peer_id, frame, now) {
418 self.relay_stats.current_queue_size = self.relay_queue.len();
419 trace!("Queued relay request for peer {:?}", peer_id);
420 true
421 } else {
422 if !self.relay_queue.check_rate_limit(*peer_id, now) {
424 self.relay_stats.requests_rate_limited += 1;
425 } else {
426 self.relay_stats.requests_dropped += 1;
427 }
428 false
429 }
430 }
431
432 fn relay_frame_to_connection(
434 &mut self,
435 ch: ConnectionHandle,
436 _frame: frame::PunchMeNow,
437 ) -> bool {
438 trace!("Would relay frame to connection {:?}", ch);
444 true
445 }
446
447 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
449 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
450 connection.peer_id = Some(peer_id);
451 self.register_peer(peer_id, connection_handle);
452
453 self.process_queued_relays_for_peer(peer_id);
455 }
456 }
457
458 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
460 let _now = Instant::now();
461 let mut processed = 0;
462
463 let mut items_to_process = Vec::new();
465 let mut keys_to_remove = Vec::new();
466
467 for (seq, item) in &self.relay_queue.pending {
469 if item.target_peer_id == peer_id {
470 items_to_process.push(item.clone());
471 keys_to_remove.push(*seq);
472 }
473 }
474
475 for key in keys_to_remove {
477 self.relay_queue.pending.shift_remove(&key);
478 }
479
480 for item in items_to_process {
482 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
483 if self.relay_frame_to_connection(ch, item.frame.clone()) {
484 self.relay_stats.requests_relayed += 1;
485 processed += 1;
486 trace!("Processed queued relay for peer {:?}", peer_id);
487 } else {
488 self.relay_queue.requeue_failed(item);
490 self.relay_stats.requests_failed += 1;
491 }
492 }
493 }
494
495 self.relay_stats.current_queue_size = self.relay_queue.len();
496
497 if processed > 0 {
498 debug!(
499 "Processed {} queued relay requests for peer {:?}",
500 processed, peer_id
501 );
502 }
503 }
504
505 pub fn process_relay_queue(&mut self) {
507 let now = Instant::now();
508 let mut processed = 0;
509 let mut failed = 0;
510
511 while let Some(item) = self.relay_queue.next_ready(now) {
513 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
514 if self.relay_frame_to_connection(ch, item.frame.clone()) {
515 self.relay_stats.requests_relayed += 1;
516 processed += 1;
517 trace!(
518 "Successfully relayed frame to peer {:?}",
519 item.target_peer_id
520 );
521 } else {
522 self.relay_queue.requeue_failed(item);
524 self.relay_stats.requests_failed += 1;
525 failed += 1;
526 }
527 } else {
528 self.relay_queue.requeue_failed(item);
530 failed += 1;
531 }
532 }
533
534 let expired = self.relay_queue.cleanup_expired(now);
536 if expired > 0 {
537 self.relay_stats.requests_timed_out += expired as u64;
538 debug!("Cleaned up {} expired relay requests", expired);
539 }
540
541 self.relay_stats.current_queue_size = self.relay_queue.len();
542
543 if processed > 0 || failed > 0 {
544 trace!(
545 "Relay queue processing: {} processed, {} failed, {} in queue",
546 processed,
547 failed,
548 self.relay_queue.len()
549 );
550 }
551 }
552
553 pub fn relay_stats(&self) -> &RelayStats {
555 &self.relay_stats
556 }
557
558 pub fn relay_queue_len(&self) -> usize {
560 self.relay_queue.len()
561 }
562
563 pub fn handle_event(
567 &mut self,
568 ch: ConnectionHandle,
569 event: EndpointEvent,
570 ) -> Option<ConnectionEvent> {
571 use EndpointEventInner::*;
572 match event.0 {
573 EndpointEventInner::NeedIdentifiers(now, n) => {
574 return Some(self.send_new_identifiers(now, ch, n));
575 }
576 ResetToken(remote, token) => {
577 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
578 self.index.connection_reset_tokens.remove(old.0, old.1);
579 }
580 if self.index.connection_reset_tokens.insert(remote, token, ch) {
581 warn!("duplicate reset token");
582 }
583 }
584 RetireConnectionId(now, seq, allow_more_cids) => {
585 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
586 trace!("peer retired CID {}: {}", seq, cid);
587 self.index.retire(cid);
588 if allow_more_cids {
589 return Some(self.send_new_identifiers(now, ch, 1));
590 }
591 }
592 }
593 RelayPunchMeNow(target_peer_id, punch_me_now) => {
594 let peer_id = PeerId(target_peer_id);
596 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
597 trace!(
598 "Successfully queued PunchMeNow frame for relay to peer {:?}",
599 peer_id
600 );
601 } else {
602 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
603 }
604 }
605 SendAddressFrame(add_address_frame) => {
606 trace!(
608 "Sending ADD_ADDRESS frame: seq={}, addr={}, priority={}",
609 add_address_frame.sequence,
610 add_address_frame.address,
611 add_address_frame.priority
612 );
613
614 debug!(
617 "ADD_ADDRESS frame ready for transmission: {:?}",
618 add_address_frame
619 );
620 }
621 NatCandidateValidated { address, challenge } => {
622 trace!(
624 "NAT candidate validation succeeded for {} with challenge {:016x}",
625 address, challenge
626 );
627
628 debug!("NAT candidate {} validated successfully", address);
632 }
633 Drained => {
634 if let Some(conn) = self.connections.try_remove(ch.0) {
635 self.index.remove(&conn);
636 if let Some(peer_id) = conn.peer_id {
638 self.peer_connections.remove(&peer_id);
639 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
640 }
641 } else {
642 error!(id = ch.0, "unknown connection drained");
646 }
647 }
648 }
649 None
650 }
651
652 pub fn handle(
654 &mut self,
655 now: Instant,
656 remote: SocketAddr,
657 local_ip: Option<IpAddr>,
658 ecn: Option<EcnCodepoint>,
659 data: BytesMut,
660 buf: &mut Vec<u8>,
661 ) -> Option<DatagramEvent> {
662 let datagram_len = data.len();
664 let event = match PartialDecode::new(
665 data,
666 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
667 &self.config.supported_versions,
668 self.config.grease_quic_bit,
669 ) {
670 Ok((first_decode, remaining)) => DatagramConnectionEvent {
671 now,
672 remote,
673 ecn,
674 first_decode,
675 remaining,
676 },
677 Err(PacketDecodeError::UnsupportedVersion {
678 src_cid,
679 dst_cid,
680 version,
681 }) => {
682 if self.server_config.is_none() {
683 debug!("dropping packet with unsupported version");
684 return None;
685 }
686 trace!("sending version negotiation");
687 Header::VersionNegotiate {
689 random: self.rng.r#gen::<u8>() | 0x40,
690 src_cid: dst_cid,
691 dst_cid: src_cid,
692 }
693 .encode(buf);
694 buf.write::<u32>(match version {
696 0x0a1a_2a3a => 0x0a1a_2a4a,
697 _ => 0x0a1a_2a3a,
698 });
699 for &version in &self.config.supported_versions {
700 buf.write(version);
701 }
702 return Some(DatagramEvent::Response(Transmit {
703 destination: remote,
704 ecn: None,
705 size: buf.len(),
706 segment_size: None,
707 src_ip: local_ip,
708 }));
709 }
710 Err(e) => {
711 trace!("malformed header: {}", e);
712 return None;
713 }
714 };
715
716 let addresses = FourTuple { remote, local_ip };
717 let dst_cid = event.first_decode.dst_cid();
718
719 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
720 match route_to {
722 RouteDatagramTo::Incoming(incoming_idx) => {
723 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
724 let config = &self.server_config.as_ref().unwrap();
725
726 if incoming_buffer
727 .total_bytes
728 .checked_add(datagram_len as u64)
729 .is_some_and(|n| n <= config.incoming_buffer_size)
730 && self
731 .all_incoming_buffers_total_bytes
732 .checked_add(datagram_len as u64)
733 .is_some_and(|n| n <= config.incoming_buffer_size_total)
734 {
735 incoming_buffer.datagrams.push(event);
736 incoming_buffer.total_bytes += datagram_len as u64;
737 self.all_incoming_buffers_total_bytes += datagram_len as u64;
738 }
739
740 None
741 }
742 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
743 ch,
744 ConnectionEvent(ConnectionEventInner::Datagram(event)),
745 )),
746 }
747 } else if event.first_decode.initial_header().is_some() {
748 self.handle_first_packet(datagram_len, event, addresses, buf)
751 } else if event.first_decode.has_long_header() {
752 debug!(
753 "ignoring non-initial packet for unknown connection {}",
754 dst_cid
755 );
756 None
757 } else if !event.first_decode.is_initial()
758 && self.local_cid_generator.validate(dst_cid).is_err()
759 {
760 debug!("dropping packet with invalid CID");
764 None
765 } else if dst_cid.is_empty() {
766 trace!("dropping unrecognized short packet without ID");
767 None
768 } else {
769 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
770 .map(DatagramEvent::Response)
771 }
772 }
773
774 fn stateless_reset(
775 &mut self,
776 now: Instant,
777 inciting_dgram_len: usize,
778 addresses: FourTuple,
779 dst_cid: ConnectionId,
780 buf: &mut Vec<u8>,
781 ) -> Option<Transmit> {
782 if self
783 .last_stateless_reset
784 .is_some_and(|last| last + self.config.min_reset_interval > now)
785 {
786 debug!("ignoring unexpected packet within minimum stateless reset interval");
787 return None;
788 }
789
790 const MIN_PADDING_LEN: usize = 5;
792
793 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
796 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
797 _ => {
798 debug!(
799 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
800 inciting_dgram_len
801 );
802 return None;
803 }
804 };
805
806 debug!(
807 "sending stateless reset for {} to {}",
808 dst_cid, addresses.remote
809 );
810 self.last_stateless_reset = Some(now);
811 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
813 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
814 max_padding_len
815 } else {
816 self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
817 };
818 buf.reserve(padding_len + RESET_TOKEN_SIZE);
819 buf.resize(padding_len, 0);
820 self.rng.fill_bytes(&mut buf[0..padding_len]);
821 buf[0] = 0b0100_0000 | (buf[0] >> 2);
822 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
823
824 debug_assert!(buf.len() < inciting_dgram_len);
825
826 Some(Transmit {
827 destination: addresses.remote,
828 ecn: None,
829 size: buf.len(),
830 segment_size: None,
831 src_ip: addresses.local_ip,
832 })
833 }
834
835 pub fn connect(
837 &mut self,
838 now: Instant,
839 config: ClientConfig,
840 remote: SocketAddr,
841 server_name: &str,
842 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
843 if self.cids_exhausted() {
844 return Err(ConnectError::CidsExhausted);
845 }
846 if remote.port() == 0 || remote.ip().is_unspecified() {
847 return Err(ConnectError::InvalidRemoteAddress(remote));
848 }
849 if !self.config.supported_versions.contains(&config.version) {
850 return Err(ConnectError::UnsupportedVersion);
851 }
852
853 let remote_id = (config.initial_dst_cid_provider)();
854 trace!(initial_dcid = %remote_id);
855
856 let ch = ConnectionHandle(self.connections.vacant_key());
857 let loc_cid = self.new_cid(ch);
858 let params = TransportParameters::new(
859 &config.transport,
860 &self.config,
861 self.local_cid_generator.as_ref(),
862 loc_cid,
863 None,
864 &mut self.rng,
865 );
866 let tls = config
867 .crypto
868 .start_session(config.version, server_name, ¶ms)?;
869
870 let conn = self.add_connection(
871 ch,
872 config.version,
873 remote_id,
874 loc_cid,
875 remote_id,
876 FourTuple {
877 remote,
878 local_ip: None,
879 },
880 now,
881 tls,
882 config.transport,
883 SideArgs::Client {
884 token_store: config.token_store,
885 server_name: server_name.into(),
886 },
887 );
888 Ok((ch, conn))
889 }
890
891 fn send_new_identifiers(
892 &mut self,
893 now: Instant,
894 ch: ConnectionHandle,
895 num: u64,
896 ) -> ConnectionEvent {
897 let mut ids = vec![];
898 for _ in 0..num {
899 let id = self.new_cid(ch);
900 let meta = &mut self.connections[ch];
901 let sequence = meta.cids_issued;
902 meta.cids_issued += 1;
903 meta.loc_cids.insert(sequence, id);
904 ids.push(IssuedCid {
905 sequence,
906 id,
907 reset_token: ResetToken::new(&*self.config.reset_key, id),
908 });
909 }
910 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
911 }
912
913 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
915 loop {
916 let cid = self.local_cid_generator.generate_cid();
917 if cid.is_empty() {
918 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
920 return cid;
921 }
922 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
923 e.insert(ch);
924 break cid;
925 }
926 }
927 }
928
929 fn handle_first_packet(
930 &mut self,
931 datagram_len: usize,
932 event: DatagramConnectionEvent,
933 addresses: FourTuple,
934 buf: &mut Vec<u8>,
935 ) -> Option<DatagramEvent> {
936 let dst_cid = event.first_decode.dst_cid();
937 let header = event.first_decode.initial_header().unwrap();
938
939 let Some(server_config) = &self.server_config else {
940 debug!("packet for unrecognized connection {}", dst_cid);
941 return self
942 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
943 .map(DatagramEvent::Response);
944 };
945
946 if datagram_len < MIN_INITIAL_SIZE as usize {
947 debug!("ignoring short initial for connection {}", dst_cid);
948 return None;
949 }
950
951 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
952 Ok(keys) => keys,
953 Err(UnsupportedVersion) => {
954 debug!(
957 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
958 header.version
959 );
960 return None;
961 }
962 };
963
964 if let Err(reason) = self.early_validate_first_packet(header) {
965 return Some(DatagramEvent::Response(self.initial_close(
966 header.version,
967 addresses,
968 &crypto,
969 &header.src_cid,
970 reason,
971 buf,
972 )));
973 }
974
975 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
976 Ok(packet) => packet,
977 Err(e) => {
978 trace!("unable to decode initial packet: {}", e);
979 return None;
980 }
981 };
982
983 if !packet.reserved_bits_valid() {
984 debug!("dropping connection attempt with invalid reserved bits");
985 return None;
986 }
987
988 let Header::Initial(header) = packet.header else {
989 panic!("non-initial packet in handle_first_packet()");
990 };
991
992 let server_config = self.server_config.as_ref().unwrap().clone();
993
994 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
995 Ok(token) => token,
996 Err(InvalidRetryTokenError) => {
997 debug!("rejecting invalid retry token");
998 return Some(DatagramEvent::Response(self.initial_close(
999 header.version,
1000 addresses,
1001 &crypto,
1002 &header.src_cid,
1003 TransportError::INVALID_TOKEN(""),
1004 buf,
1005 )));
1006 }
1007 };
1008
1009 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1010 self.index
1011 .insert_initial_incoming(header.dst_cid, incoming_idx);
1012
1013 Some(DatagramEvent::NewConnection(Incoming {
1014 received_at: event.now,
1015 addresses,
1016 ecn: event.ecn,
1017 packet: InitialPacket {
1018 header,
1019 header_data: packet.header_data,
1020 payload: packet.payload,
1021 },
1022 rest: event.remaining,
1023 crypto,
1024 token,
1025 incoming_idx,
1026 improper_drop_warner: IncomingImproperDropWarner,
1027 }))
1028 }
1029
1030 #[allow(clippy::result_large_err)]
1033 pub fn accept(
1034 &mut self,
1035 mut incoming: Incoming,
1036 now: Instant,
1037 buf: &mut Vec<u8>,
1038 server_config: Option<Arc<ServerConfig>>,
1039 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1040 let remote_address_validated = incoming.remote_address_validated();
1041 incoming.improper_drop_warner.dismiss();
1042 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1043 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1044
1045 let packet_number = incoming.packet.header.number.expand(0);
1046 let InitialHeader {
1047 src_cid,
1048 dst_cid,
1049 version,
1050 ..
1051 } = incoming.packet.header;
1052 let server_config =
1053 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1054
1055 if server_config
1056 .transport
1057 .max_idle_timeout
1058 .is_some_and(|timeout| {
1059 incoming.received_at + Duration::from_millis(timeout.into()) <= now
1060 })
1061 {
1062 debug!("abandoning accept of stale initial");
1063 self.index.remove_initial(dst_cid);
1064 return Err(AcceptError {
1065 cause: ConnectionError::TimedOut,
1066 response: None,
1067 });
1068 }
1069
1070 if self.cids_exhausted() {
1071 debug!("refusing connection");
1072 self.index.remove_initial(dst_cid);
1073 return Err(AcceptError {
1074 cause: ConnectionError::CidsExhausted,
1075 response: Some(self.initial_close(
1076 version,
1077 incoming.addresses,
1078 &incoming.crypto,
1079 &src_cid,
1080 TransportError::CONNECTION_REFUSED(""),
1081 buf,
1082 )),
1083 });
1084 }
1085
1086 if incoming
1087 .crypto
1088 .packet
1089 .remote
1090 .decrypt(
1091 packet_number,
1092 &incoming.packet.header_data,
1093 &mut incoming.packet.payload,
1094 )
1095 .is_err()
1096 {
1097 debug!(packet_number, "failed to authenticate initial packet");
1098 self.index.remove_initial(dst_cid);
1099 return Err(AcceptError {
1100 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1101 response: None,
1102 });
1103 };
1104
1105 let ch = ConnectionHandle(self.connections.vacant_key());
1106 let loc_cid = self.new_cid(ch);
1107 let mut params = TransportParameters::new(
1108 &server_config.transport,
1109 &self.config,
1110 self.local_cid_generator.as_ref(),
1111 loc_cid,
1112 Some(&server_config),
1113 &mut self.rng,
1114 );
1115 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1116 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1117 params.retry_src_cid = incoming.token.retry_src_cid;
1118 let mut pref_addr_cid = None;
1119 if server_config.has_preferred_address() {
1120 let cid = self.new_cid(ch);
1121 pref_addr_cid = Some(cid);
1122 params.preferred_address = Some(PreferredAddress {
1123 address_v4: server_config.preferred_address_v4,
1124 address_v6: server_config.preferred_address_v6,
1125 connection_id: cid,
1126 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1127 });
1128 }
1129
1130 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1131 let transport_config = server_config.transport.clone();
1132 let mut conn = self.add_connection(
1133 ch,
1134 version,
1135 dst_cid,
1136 loc_cid,
1137 src_cid,
1138 incoming.addresses,
1139 incoming.received_at,
1140 tls,
1141 transport_config,
1142 SideArgs::Server {
1143 server_config,
1144 pref_addr_cid,
1145 path_validated: remote_address_validated,
1146 },
1147 );
1148 self.index.insert_initial(dst_cid, ch);
1149
1150 match conn.handle_first_packet(
1151 incoming.received_at,
1152 incoming.addresses.remote,
1153 incoming.ecn,
1154 packet_number,
1155 incoming.packet,
1156 incoming.rest,
1157 ) {
1158 Ok(()) => {
1159 trace!(id = ch.0, icid = %dst_cid, "new connection");
1160
1161 for event in incoming_buffer.datagrams {
1162 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1163 }
1164
1165 Ok((ch, conn))
1166 }
1167 Err(e) => {
1168 debug!("handshake failed: {}", e);
1169 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1170 let response = match e {
1171 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1172 version,
1173 incoming.addresses,
1174 &incoming.crypto,
1175 &src_cid,
1176 e.clone(),
1177 buf,
1178 )),
1179 _ => None,
1180 };
1181 Err(AcceptError { cause: e, response })
1182 }
1183 }
1184 }
1185
1186 fn early_validate_first_packet(
1188 &mut self,
1189 header: &ProtectedInitialHeader,
1190 ) -> Result<(), TransportError> {
1191 let config = &self.server_config.as_ref().unwrap();
1192 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1193 return Err(TransportError::CONNECTION_REFUSED(""));
1194 }
1195
1196 if header.dst_cid.len() < 8
1201 && (header.token_pos.is_empty()
1202 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1203 {
1204 debug!(
1205 "rejecting connection due to invalid DCID length {}",
1206 header.dst_cid.len()
1207 );
1208 return Err(TransportError::PROTOCOL_VIOLATION(
1209 "invalid destination CID length",
1210 ));
1211 }
1212
1213 Ok(())
1214 }
1215
1216 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1218 self.clean_up_incoming(&incoming);
1219 incoming.improper_drop_warner.dismiss();
1220
1221 self.initial_close(
1222 incoming.packet.header.version,
1223 incoming.addresses,
1224 &incoming.crypto,
1225 &incoming.packet.header.src_cid,
1226 TransportError::CONNECTION_REFUSED(""),
1227 buf,
1228 )
1229 }
1230
1231 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1235 if !incoming.may_retry() {
1236 return Err(RetryError(Box::new(incoming)));
1237 }
1238
1239 self.clean_up_incoming(&incoming);
1240 incoming.improper_drop_warner.dismiss();
1241
1242 let server_config = self.server_config.as_ref().unwrap();
1243
1244 let loc_cid = self.local_cid_generator.generate_cid();
1251
1252 let payload = TokenPayload::Retry {
1253 address: incoming.addresses.remote,
1254 orig_dst_cid: incoming.packet.header.dst_cid,
1255 issued: server_config.time_source.now(),
1256 };
1257 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1258
1259 let header = Header::Retry {
1260 src_cid: loc_cid,
1261 dst_cid: incoming.packet.header.src_cid,
1262 version: incoming.packet.header.version,
1263 };
1264
1265 let encode = header.encode(buf);
1266 buf.put_slice(&token);
1267 buf.extend_from_slice(&server_config.crypto.retry_tag(
1268 incoming.packet.header.version,
1269 &incoming.packet.header.dst_cid,
1270 buf,
1271 ));
1272 encode.finish(buf, &*incoming.crypto.header.local, None);
1273
1274 Ok(Transmit {
1275 destination: incoming.addresses.remote,
1276 ecn: None,
1277 size: buf.len(),
1278 segment_size: None,
1279 src_ip: incoming.addresses.local_ip,
1280 })
1281 }
1282
1283 pub fn ignore(&mut self, incoming: Incoming) {
1288 self.clean_up_incoming(&incoming);
1289 incoming.improper_drop_warner.dismiss();
1290 }
1291
1292 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1294 self.index.remove_initial(incoming.packet.header.dst_cid);
1295 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1296 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1297 }
1298
1299 fn add_connection(
1300 &mut self,
1301 ch: ConnectionHandle,
1302 version: u32,
1303 init_cid: ConnectionId,
1304 loc_cid: ConnectionId,
1305 rem_cid: ConnectionId,
1306 addresses: FourTuple,
1307 now: Instant,
1308 tls: Box<dyn crypto::Session>,
1309 transport_config: Arc<TransportConfig>,
1310 side_args: SideArgs,
1311 ) -> Connection {
1312 let mut rng_seed = [0; 32];
1313 self.rng.fill_bytes(&mut rng_seed);
1314 let side = side_args.side();
1315 let pref_addr_cid = side_args.pref_addr_cid();
1316 let conn = Connection::new(
1317 self.config.clone(),
1318 transport_config,
1319 init_cid,
1320 loc_cid,
1321 rem_cid,
1322 addresses.remote,
1323 addresses.local_ip,
1324 tls,
1325 self.local_cid_generator.as_ref(),
1326 now,
1327 version,
1328 self.allow_mtud,
1329 rng_seed,
1330 side_args,
1331 );
1332
1333 let mut cids_issued = 0;
1334 let mut loc_cids = FxHashMap::default();
1335
1336 loc_cids.insert(cids_issued, loc_cid);
1337 cids_issued += 1;
1338
1339 if let Some(cid) = pref_addr_cid {
1340 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1341 loc_cids.insert(cids_issued, cid);
1342 cids_issued += 1;
1343 }
1344
1345 let id = self.connections.insert(ConnectionMeta {
1346 init_cid,
1347 cids_issued,
1348 loc_cids,
1349 addresses,
1350 side,
1351 reset_token: None,
1352 peer_id: None,
1353 });
1354 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1355
1356 self.index.insert_conn(addresses, loc_cid, ch, side);
1357
1358 conn
1359 }
1360
1361 fn initial_close(
1362 &mut self,
1363 version: u32,
1364 addresses: FourTuple,
1365 crypto: &Keys,
1366 remote_id: &ConnectionId,
1367 reason: TransportError,
1368 buf: &mut Vec<u8>,
1369 ) -> Transmit {
1370 let local_id = self.local_cid_generator.generate_cid();
1374 let number = PacketNumber::U8(0);
1375 let header = Header::Initial(InitialHeader {
1376 dst_cid: *remote_id,
1377 src_cid: local_id,
1378 number,
1379 token: Bytes::new(),
1380 version,
1381 });
1382
1383 let partial_encode = header.encode(buf);
1384 let max_len =
1385 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1386 frame::Close::from(reason).encode(buf, max_len);
1387 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1388 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1389 Transmit {
1390 destination: addresses.remote,
1391 ecn: None,
1392 size: buf.len(),
1393 segment_size: None,
1394 src_ip: addresses.local_ip,
1395 }
1396 }
1397
1398 pub fn config(&self) -> &EndpointConfig {
1400 &self.config
1401 }
1402
1403 pub fn enable_address_discovery(&mut self, enabled: bool) {
1410 self.address_discovery_enabled = enabled;
1411 }
1414
1415 pub fn address_discovery_enabled(&self) -> bool {
1417 self.address_discovery_enabled
1418 }
1419
1420 pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1428 Vec::new()
1430 }
1431
1432 pub fn set_address_change_callback<F>(&mut self, callback: F)
1437 where
1438 F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1439 {
1440 self.address_change_callback = Some(Box::new(callback));
1441 }
1442
1443 pub fn clear_address_change_callback(&mut self) {
1445 self.address_change_callback = None;
1446 }
1447
1448 pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1453 AddressDiscoveryStats::default()
1455 }
1456
1457 pub fn open_connections(&self) -> usize {
1459 self.connections.len()
1460 }
1461
1462 pub fn incoming_buffer_bytes(&self) -> u64 {
1465 self.all_incoming_buffers_total_bytes
1466 }
1467
1468 #[cfg(test)]
1469 pub(crate) fn known_connections(&self) -> usize {
1470 let x = self.connections.len();
1471 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1472 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1474 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1476 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1477 x
1478 }
1479
1480 #[cfg(test)]
1481 pub(crate) fn known_cids(&self) -> usize {
1482 self.index.connection_ids.len()
1483 }
1484
1485 fn cids_exhausted(&self) -> bool {
1490 self.local_cid_generator.cid_len() <= 4
1491 && self.local_cid_generator.cid_len() != 0
1492 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1493 - self.index.connection_ids.len())
1494 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1495 }
1496}
1497
1498impl fmt::Debug for Endpoint {
1499 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1500 fmt.debug_struct("Endpoint")
1501 .field("rng", &self.rng)
1502 .field("index", &self.index)
1503 .field("connections", &self.connections)
1504 .field("config", &self.config)
1505 .field("server_config", &self.server_config)
1506 .field("incoming_buffers.len", &self.incoming_buffers.len())
1508 .field(
1509 "all_incoming_buffers_total_bytes",
1510 &self.all_incoming_buffers_total_bytes,
1511 )
1512 .finish()
1513 }
1514}
1515
1516#[derive(Default)]
1518struct IncomingBuffer {
1519 datagrams: Vec<DatagramConnectionEvent>,
1520 total_bytes: u64,
1521}
1522
1523#[derive(Copy, Clone, Debug)]
1525enum RouteDatagramTo {
1526 Incoming(usize),
1527 Connection(ConnectionHandle),
1528}
1529
1530#[derive(Default, Debug)]
1532struct ConnectionIndex {
1533 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1539 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1543 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1547 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1556 connection_reset_tokens: ResetTokenTable,
1561}
1562
1563impl ConnectionIndex {
1564 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1566 if dst_cid.is_empty() {
1567 return;
1568 }
1569 self.connection_ids_initial
1570 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1571 }
1572
1573 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1575 if dst_cid.is_empty() {
1576 return;
1577 }
1578 let removed = self.connection_ids_initial.remove(&dst_cid);
1579 debug_assert!(removed.is_some());
1580 }
1581
1582 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1584 if dst_cid.is_empty() {
1585 return;
1586 }
1587 self.connection_ids_initial
1588 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1589 }
1590
1591 fn insert_conn(
1594 &mut self,
1595 addresses: FourTuple,
1596 dst_cid: ConnectionId,
1597 connection: ConnectionHandle,
1598 side: Side,
1599 ) {
1600 match dst_cid.len() {
1601 0 => match side {
1602 Side::Server => {
1603 self.incoming_connection_remotes
1604 .insert(addresses, connection);
1605 }
1606 Side::Client => {
1607 self.outgoing_connection_remotes
1608 .insert(addresses.remote, connection);
1609 }
1610 },
1611 _ => {
1612 self.connection_ids.insert(dst_cid, connection);
1613 }
1614 }
1615 }
1616
1617 fn retire(&mut self, dst_cid: ConnectionId) {
1619 self.connection_ids.remove(&dst_cid);
1620 }
1621
1622 fn remove(&mut self, conn: &ConnectionMeta) {
1624 if conn.side.is_server() {
1625 self.remove_initial(conn.init_cid);
1626 }
1627 for cid in conn.loc_cids.values() {
1628 self.connection_ids.remove(cid);
1629 }
1630 self.incoming_connection_remotes.remove(&conn.addresses);
1631 self.outgoing_connection_remotes
1632 .remove(&conn.addresses.remote);
1633 if let Some((remote, token)) = conn.reset_token {
1634 self.connection_reset_tokens.remove(remote, token);
1635 }
1636 }
1637
1638 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1640 let dst_cid = datagram.dst_cid();
1641 let is_empty_cid = dst_cid.is_empty();
1642
1643 if !is_empty_cid {
1645 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1646 return Some(RouteDatagramTo::Connection(ch));
1647 }
1648 }
1649
1650 if datagram.is_initial() || datagram.is_0rtt() {
1652 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1653 return Some(ch);
1654 }
1655 }
1656
1657 if is_empty_cid {
1659 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1661 return Some(RouteDatagramTo::Connection(ch));
1662 }
1663 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1664 return Some(RouteDatagramTo::Connection(ch));
1665 }
1666 }
1667
1668 let data = datagram.data();
1670 if data.len() < RESET_TOKEN_SIZE {
1671 return None;
1672 }
1673 self.connection_reset_tokens
1674 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1675 .cloned()
1676 .map(RouteDatagramTo::Connection)
1677 }
1678}
1679
1680#[derive(Debug)]
1681pub(crate) struct ConnectionMeta {
1682 init_cid: ConnectionId,
1683 cids_issued: u64,
1685 loc_cids: FxHashMap<u64, ConnectionId>,
1686 addresses: FourTuple,
1691 side: Side,
1692 reset_token: Option<(SocketAddr, ResetToken)>,
1695 peer_id: Option<PeerId>,
1697}
1698
1699#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1701pub struct ConnectionHandle(pub usize);
1702
1703impl From<ConnectionHandle> for usize {
1704 fn from(x: ConnectionHandle) -> Self {
1705 x.0
1706 }
1707}
1708
1709impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1710 type Output = ConnectionMeta;
1711 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1712 &self[ch.0]
1713 }
1714}
1715
1716impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1717 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1718 &mut self[ch.0]
1719 }
1720}
1721
1722pub enum DatagramEvent {
1724 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1726 NewConnection(Incoming),
1728 Response(Transmit),
1730}
1731
1732pub struct Incoming {
1734 received_at: Instant,
1735 addresses: FourTuple,
1736 ecn: Option<EcnCodepoint>,
1737 packet: InitialPacket,
1738 rest: Option<BytesMut>,
1739 crypto: Keys,
1740 token: IncomingToken,
1741 incoming_idx: usize,
1742 improper_drop_warner: IncomingImproperDropWarner,
1743}
1744
1745impl Incoming {
1746 pub fn local_ip(&self) -> Option<IpAddr> {
1750 self.addresses.local_ip
1751 }
1752
1753 pub fn remote_address(&self) -> SocketAddr {
1755 self.addresses.remote
1756 }
1757
1758 pub fn remote_address_validated(&self) -> bool {
1766 self.token.validated
1767 }
1768
1769 pub fn may_retry(&self) -> bool {
1774 self.token.retry_src_cid.is_none()
1775 }
1776
1777 pub fn orig_dst_cid(&self) -> &ConnectionId {
1779 &self.token.orig_dst_cid
1780 }
1781}
1782
1783impl fmt::Debug for Incoming {
1784 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1785 f.debug_struct("Incoming")
1786 .field("addresses", &self.addresses)
1787 .field("ecn", &self.ecn)
1788 .field("token", &self.token)
1791 .field("incoming_idx", &self.incoming_idx)
1792 .finish_non_exhaustive()
1794 }
1795}
1796
1797struct IncomingImproperDropWarner;
1798
1799impl IncomingImproperDropWarner {
1800 fn dismiss(self) {
1801 mem::forget(self);
1802 }
1803}
1804
1805impl Drop for IncomingImproperDropWarner {
1806 fn drop(&mut self) {
1807 warn!(
1808 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1809 (may cause memory leak and eventual inability to accept new connections)"
1810 );
1811 }
1812}
1813
1814#[derive(Debug, Error, Clone, PartialEq, Eq)]
1818pub enum ConnectError {
1819 #[error("endpoint stopping")]
1823 EndpointStopping,
1824 #[error("CIDs exhausted")]
1828 CidsExhausted,
1829 #[error("invalid server name: {0}")]
1831 InvalidServerName(String),
1832 #[error("invalid remote address: {0}")]
1836 InvalidRemoteAddress(SocketAddr),
1837 #[error("no default client config")]
1841 NoDefaultClientConfig,
1842 #[error("unsupported QUIC version")]
1844 UnsupportedVersion,
1845}
1846
1847#[derive(Debug)]
1849pub struct AcceptError {
1850 pub cause: ConnectionError,
1852 pub response: Option<Transmit>,
1854}
1855
1856#[derive(Debug, Error)]
1858#[error("retry() with validated Incoming")]
1859pub struct RetryError(Box<Incoming>);
1860
1861impl RetryError {
1862 pub fn into_incoming(self) -> Incoming {
1864 *self.0
1865 }
1866}
1867
1868#[derive(Default, Debug)]
1873struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1874
1875impl ResetTokenTable {
1876 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1877 self.0
1878 .entry(remote)
1879 .or_default()
1880 .insert(token, ch)
1881 .is_some()
1882 }
1883
1884 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1885 use std::collections::hash_map::Entry;
1886 match self.0.entry(remote) {
1887 Entry::Vacant(_) => {}
1888 Entry::Occupied(mut e) => {
1889 e.get_mut().remove(&token);
1890 if e.get().is_empty() {
1891 e.remove_entry();
1892 }
1893 }
1894 }
1895 }
1896
1897 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1898 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1899 self.0.get(&remote)?.get(&token)
1900 }
1901}
1902
1903#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1908struct FourTuple {
1909 remote: SocketAddr,
1910 local_ip: Option<IpAddr>,
1912}