1use std::{
2 collections::{HashMap, VecDeque, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21 Side, Transmit, TransportConfig, TransportError,
22 cid_generator::ConnectionIdGenerator,
23 coding::BufMutExt,
24 config::{ClientConfig, EndpointConfig, ServerConfig},
25 connection::{Connection, ConnectionError, SideArgs},
26 crypto::{self, Keys, UnsupportedVersion},
27 frame,
28 nat_traversal_api::PeerId,
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31 PacketNumber, PartialDecode, ProtectedInitialHeader,
32 },
33 shared::{
34 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35 EndpointEvent, EndpointEventInner, IssuedCid,
36 },
37 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38 transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41#[derive(Debug, Clone)]
43struct RelayQueueItem {
44 target_peer_id: PeerId,
46 frame: frame::PunchMeNow,
48 created_at: Instant,
50 attempts: u32,
52 last_attempt: Option<Instant>,
54}
55
56#[derive(Debug)]
58struct RelayQueue {
59 pending: IndexMap<u64, RelayQueueItem>,
61 next_seq: u64,
63 max_queue_size: usize,
65 request_timeout: Duration,
67 max_retries: u32,
69 retry_interval: Duration,
71 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73 max_relays_per_peer: usize,
75 rate_limit_window: Duration,
77}
78
79#[derive(Debug, Default, Clone)]
81pub struct AddressDiscoveryStats {
82 pub frames_sent: u64,
84 pub frames_received: u64,
86 pub addresses_discovered: u64,
88 pub address_changes_detected: u64,
90}
91
92#[derive(Debug, Default)]
94pub struct RelayStats {
95 requests_received: u64,
97 requests_relayed: u64,
99 requests_failed: u64,
101 requests_dropped: u64,
103 requests_timed_out: u64,
105 requests_rate_limited: u64,
107 current_queue_size: usize,
109}
110
111impl RelayQueue {
112 fn new() -> Self {
114 Self {
115 pending: IndexMap::new(),
116 next_seq: 0,
117 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
120 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
122 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
125 }
126
127 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
129 if self.pending.len() >= self.max_queue_size {
131 warn!(
132 "Relay queue full, dropping request for peer {:?}",
133 target_peer_id
134 );
135 return false;
136 }
137
138 if !self.check_rate_limit(target_peer_id, now) {
140 warn!(
141 "Rate limit exceeded for peer {:?}, dropping relay request",
142 target_peer_id
143 );
144 return false;
145 }
146
147 let item = RelayQueueItem {
148 target_peer_id,
149 frame,
150 created_at: now,
151 attempts: 0,
152 last_attempt: None,
153 };
154
155 let seq = self.next_seq;
156 self.next_seq += 1;
157 self.pending.insert(seq, item);
158
159 self.record_relay_request(target_peer_id, now);
161
162 trace!(
163 "Queued relay request for peer {:?}, queue size: {}",
164 target_peer_id,
165 self.pending.len()
166 );
167 true
168 }
169
170 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
172 self.cleanup_rate_limiter(now);
174
175 if let Some(requests) = self.rate_limiter.get(&peer_id) {
177 requests.len() < self.max_relays_per_peer
178 } else {
179 true }
181 }
182
183 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
185 self.rate_limiter
186 .entry(peer_id)
187 .or_insert_with(VecDeque::new)
188 .push_back(now);
189 }
190
191 fn cleanup_rate_limiter(&mut self, now: Instant) {
193 self.rate_limiter.retain(|_, requests| {
194 requests.retain(|&request_time| {
195 now.saturating_duration_since(request_time) <= self.rate_limit_window
196 });
197 !requests.is_empty()
198 });
199 }
200
201 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
203 let mut expired_keys = Vec::new();
205 let mut ready_key = None;
206
207 for (seq, item) in &self.pending {
208 if now.saturating_duration_since(item.created_at) > self.request_timeout {
210 expired_keys.push(*seq);
211 continue;
212 }
213
214 if item.attempts == 0
216 || item.last_attempt.map_or(true, |last| {
217 now.saturating_duration_since(last) >= self.retry_interval
218 })
219 {
220 ready_key = Some(*seq);
221 break;
222 }
223 }
224
225 for key in expired_keys {
227 if let Some(expired) = self.pending.shift_remove(&key) {
228 debug!(
229 "Relay request for peer {:?} timed out after {:?}",
230 expired.target_peer_id,
231 now.saturating_duration_since(expired.created_at)
232 );
233 }
234 }
235
236 if let Some(key) = ready_key {
238 if let Some(mut item) = self.pending.shift_remove(&key) {
239 item.attempts += 1;
240 item.last_attempt = Some(now);
241 return Some(item);
242 }
243 }
244
245 None
246 }
247
248 fn requeue_failed(&mut self, item: RelayQueueItem) {
250 if item.attempts < self.max_retries {
251 trace!(
252 "Requeuing failed relay request for peer {:?}, attempt {}/{}",
253 item.target_peer_id, item.attempts, self.max_retries
254 );
255 let seq = self.next_seq;
256 self.next_seq += 1;
257 self.pending.insert(seq, item);
258 } else {
259 debug!(
260 "Dropping relay request for peer {:?} after {} failed attempts",
261 item.target_peer_id, item.attempts
262 );
263 }
264 }
265
266 fn cleanup_expired(&mut self, now: Instant) -> usize {
268 let initial_len = self.pending.len();
269
270 let expired_keys: Vec<u64> = self
272 .pending
273 .iter()
274 .filter_map(|(seq, item)| {
275 if now.saturating_duration_since(item.created_at) > self.request_timeout {
276 Some(*seq)
277 } else {
278 None
279 }
280 })
281 .collect();
282
283 for key in expired_keys {
285 if let Some(expired) = self.pending.shift_remove(&key) {
286 debug!(
287 "Removing expired relay request for peer {:?}",
288 expired.target_peer_id
289 );
290 }
291 }
292
293 initial_len - self.pending.len()
294 }
295
296 fn len(&self) -> usize {
298 self.pending.len()
299 }
300}
301
302pub struct Endpoint {
307 rng: StdRng,
308 index: ConnectionIndex,
309 connections: Slab<ConnectionMeta>,
310 local_cid_generator: Box<dyn ConnectionIdGenerator>,
311 config: Arc<EndpointConfig>,
312 server_config: Option<Arc<ServerConfig>>,
313 allow_mtud: bool,
315 last_stateless_reset: Option<Instant>,
317 incoming_buffers: Slab<IncomingBuffer>,
319 all_incoming_buffers_total_bytes: u64,
320 peer_connections: HashMap<PeerId, ConnectionHandle>,
322 relay_queue: RelayQueue,
324 relay_stats: RelayStats,
326 address_discovery_enabled: bool,
328 address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
330}
331
332impl Endpoint {
333 pub fn new(
344 config: Arc<EndpointConfig>,
345 server_config: Option<Arc<ServerConfig>>,
346 allow_mtud: bool,
347 rng_seed: Option<[u8; 32]>,
348 ) -> Self {
349 let rng_seed = rng_seed.or(config.rng_seed);
350 Self {
351 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
352 index: ConnectionIndex::default(),
353 connections: Slab::new(),
354 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
355 config,
356 server_config,
357 allow_mtud,
358 last_stateless_reset: None,
359 incoming_buffers: Slab::new(),
360 all_incoming_buffers_total_bytes: 0,
361 peer_connections: HashMap::new(),
362 relay_queue: RelayQueue::new(),
363 relay_stats: RelayStats::default(),
364 address_discovery_enabled: true, address_change_callback: None,
366 }
367 }
368
369 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
371 self.server_config = server_config;
372 }
373
374 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
376 self.peer_connections.insert(peer_id, connection_handle);
377 trace!(
378 "Registered peer {:?} with connection {:?}",
379 peer_id, connection_handle
380 );
381 }
382
383 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
385 if let Some(handle) = self.peer_connections.remove(peer_id) {
386 trace!(
387 "Unregistered peer {:?} from connection {:?}",
388 peer_id, handle
389 );
390 }
391 }
392
393 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
395 self.peer_connections.get(peer_id).copied()
396 }
397
398 pub(crate) fn queue_frame_for_peer(
400 &mut self,
401 peer_id: &PeerId,
402 frame: frame::PunchMeNow,
403 ) -> bool {
404 self.relay_stats.requests_received += 1;
405
406 if let Some(ch) = self.lookup_peer_connection(peer_id) {
407 if self.relay_frame_to_connection(ch, frame.clone()) {
409 self.relay_stats.requests_relayed += 1;
410 trace!(
411 "Immediately relayed frame to peer {:?} via connection {:?}",
412 peer_id, ch
413 );
414 return true;
415 }
416 }
417
418 let now = Instant::now();
420 if self.relay_queue.enqueue(*peer_id, frame, now) {
421 self.relay_stats.current_queue_size = self.relay_queue.len();
422 trace!("Queued relay request for peer {:?}", peer_id);
423 true
424 } else {
425 if !self.relay_queue.check_rate_limit(*peer_id, now) {
427 self.relay_stats.requests_rate_limited += 1;
428 } else {
429 self.relay_stats.requests_dropped += 1;
430 }
431 false
432 }
433 }
434
435 fn relay_frame_to_connection(
437 &mut self,
438 ch: ConnectionHandle,
439 _frame: frame::PunchMeNow,
440 ) -> bool {
441 trace!("Would relay frame to connection {:?}", ch);
447 true
448 }
449
450 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
452 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
453 connection.peer_id = Some(peer_id);
454 self.register_peer(peer_id, connection_handle);
455
456 self.process_queued_relays_for_peer(peer_id);
458 }
459 }
460
461 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
463 let _now = Instant::now();
464 let mut processed = 0;
465
466 let mut items_to_process = Vec::new();
468 let mut keys_to_remove = Vec::new();
469
470 for (seq, item) in &self.relay_queue.pending {
472 if item.target_peer_id == peer_id {
473 items_to_process.push(item.clone());
474 keys_to_remove.push(*seq);
475 }
476 }
477
478 for key in keys_to_remove {
480 self.relay_queue.pending.shift_remove(&key);
481 }
482
483 for item in items_to_process {
485 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
486 if self.relay_frame_to_connection(ch, item.frame.clone()) {
487 self.relay_stats.requests_relayed += 1;
488 processed += 1;
489 trace!("Processed queued relay for peer {:?}", peer_id);
490 } else {
491 self.relay_queue.requeue_failed(item);
493 self.relay_stats.requests_failed += 1;
494 }
495 }
496 }
497
498 self.relay_stats.current_queue_size = self.relay_queue.len();
499
500 if processed > 0 {
501 debug!(
502 "Processed {} queued relay requests for peer {:?}",
503 processed, peer_id
504 );
505 }
506 }
507
508 pub fn process_relay_queue(&mut self) {
510 let now = Instant::now();
511 let mut processed = 0;
512 let mut failed = 0;
513
514 while let Some(item) = self.relay_queue.next_ready(now) {
516 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
517 if self.relay_frame_to_connection(ch, item.frame.clone()) {
518 self.relay_stats.requests_relayed += 1;
519 processed += 1;
520 trace!(
521 "Successfully relayed frame to peer {:?}",
522 item.target_peer_id
523 );
524 } else {
525 self.relay_queue.requeue_failed(item);
527 self.relay_stats.requests_failed += 1;
528 failed += 1;
529 }
530 } else {
531 self.relay_queue.requeue_failed(item);
533 failed += 1;
534 }
535 }
536
537 let expired = self.relay_queue.cleanup_expired(now);
539 if expired > 0 {
540 self.relay_stats.requests_timed_out += expired as u64;
541 debug!("Cleaned up {} expired relay requests", expired);
542 }
543
544 self.relay_stats.current_queue_size = self.relay_queue.len();
545
546 if processed > 0 || failed > 0 {
547 trace!(
548 "Relay queue processing: {} processed, {} failed, {} in queue",
549 processed,
550 failed,
551 self.relay_queue.len()
552 );
553 }
554 }
555
556 pub fn relay_stats(&self) -> &RelayStats {
558 &self.relay_stats
559 }
560
561 pub fn relay_queue_len(&self) -> usize {
563 self.relay_queue.len()
564 }
565
566 pub fn handle_event(
570 &mut self,
571 ch: ConnectionHandle,
572 event: EndpointEvent,
573 ) -> Option<ConnectionEvent> {
574 use EndpointEventInner::*;
575 match event.0 {
576 EndpointEventInner::NeedIdentifiers(now, n) => {
577 return Some(self.send_new_identifiers(now, ch, n));
578 }
579 ResetToken(remote, token) => {
580 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
581 self.index.connection_reset_tokens.remove(old.0, old.1);
582 }
583 if self.index.connection_reset_tokens.insert(remote, token, ch) {
584 warn!("duplicate reset token");
585 }
586 }
587 RetireConnectionId(now, seq, allow_more_cids) => {
588 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
589 trace!("peer retired CID {}: {}", seq, cid);
590 self.index.retire(cid);
591 if allow_more_cids {
592 return Some(self.send_new_identifiers(now, ch, 1));
593 }
594 }
595 }
596 RelayPunchMeNow(target_peer_id, punch_me_now) => {
597 let peer_id = PeerId(target_peer_id);
599 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
600 trace!(
601 "Successfully queued PunchMeNow frame for relay to peer {:?}",
602 peer_id
603 );
604 } else {
605 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
606 }
607 }
608 SendAddressFrame(add_address_frame) => {
609 trace!(
611 "Sending ADD_ADDRESS frame: seq={}, addr={}, priority={}",
612 add_address_frame.sequence,
613 add_address_frame.address,
614 add_address_frame.priority
615 );
616
617 debug!(
620 "ADD_ADDRESS frame ready for transmission: {:?}",
621 add_address_frame
622 );
623 }
624 NatCandidateValidated { address, challenge } => {
625 trace!(
627 "NAT candidate validation succeeded for {} with challenge {:016x}",
628 address, challenge
629 );
630
631 debug!("NAT candidate {} validated successfully", address);
635 }
636 Drained => {
637 if let Some(conn) = self.connections.try_remove(ch.0) {
638 self.index.remove(&conn);
639 if let Some(peer_id) = conn.peer_id {
641 self.peer_connections.remove(&peer_id);
642 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
643 }
644 } else {
645 error!(id = ch.0, "unknown connection drained");
649 }
650 }
651 }
652 None
653 }
654
655 pub fn handle(
657 &mut self,
658 now: Instant,
659 remote: SocketAddr,
660 local_ip: Option<IpAddr>,
661 ecn: Option<EcnCodepoint>,
662 data: BytesMut,
663 buf: &mut Vec<u8>,
664 ) -> Option<DatagramEvent> {
665 let datagram_len = data.len();
667 let event = match PartialDecode::new(
668 data,
669 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
670 &self.config.supported_versions,
671 self.config.grease_quic_bit,
672 ) {
673 Ok((first_decode, remaining)) => DatagramConnectionEvent {
674 now,
675 remote,
676 ecn,
677 first_decode,
678 remaining,
679 },
680 Err(PacketDecodeError::UnsupportedVersion {
681 src_cid,
682 dst_cid,
683 version,
684 }) => {
685 if self.server_config.is_none() {
686 debug!("dropping packet with unsupported version");
687 return None;
688 }
689 trace!("sending version negotiation");
690 Header::VersionNegotiate {
692 random: self.rng.gen::<u8>() | 0x40,
693 src_cid: dst_cid,
694 dst_cid: src_cid,
695 }
696 .encode(buf);
697 buf.write::<u32>(match version {
699 0x0a1a_2a3a => 0x0a1a_2a4a,
700 _ => 0x0a1a_2a3a,
701 });
702 for &version in &self.config.supported_versions {
703 buf.write(version);
704 }
705 return Some(DatagramEvent::Response(Transmit {
706 destination: remote,
707 ecn: None,
708 size: buf.len(),
709 segment_size: None,
710 src_ip: local_ip,
711 }));
712 }
713 Err(e) => {
714 trace!("malformed header: {}", e);
715 return None;
716 }
717 };
718
719 let addresses = FourTuple { remote, local_ip };
720 let dst_cid = event.first_decode.dst_cid();
721
722 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
723 match route_to {
725 RouteDatagramTo::Incoming(incoming_idx) => {
726 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
727 let config = &self.server_config.as_ref().unwrap();
728
729 if incoming_buffer
730 .total_bytes
731 .checked_add(datagram_len as u64)
732 .is_some_and(|n| n <= config.incoming_buffer_size)
733 && self
734 .all_incoming_buffers_total_bytes
735 .checked_add(datagram_len as u64)
736 .is_some_and(|n| n <= config.incoming_buffer_size_total)
737 {
738 incoming_buffer.datagrams.push(event);
739 incoming_buffer.total_bytes += datagram_len as u64;
740 self.all_incoming_buffers_total_bytes += datagram_len as u64;
741 }
742
743 None
744 }
745 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
746 ch,
747 ConnectionEvent(ConnectionEventInner::Datagram(event)),
748 )),
749 }
750 } else if event.first_decode.initial_header().is_some() {
751 self.handle_first_packet(datagram_len, event, addresses, buf)
754 } else if event.first_decode.has_long_header() {
755 debug!(
756 "ignoring non-initial packet for unknown connection {}",
757 dst_cid
758 );
759 None
760 } else if !event.first_decode.is_initial()
761 && self.local_cid_generator.validate(dst_cid).is_err()
762 {
763 debug!("dropping packet with invalid CID");
767 None
768 } else if dst_cid.is_empty() {
769 trace!("dropping unrecognized short packet without ID");
770 None
771 } else {
772 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
773 .map(DatagramEvent::Response)
774 }
775 }
776
777 fn stateless_reset(
778 &mut self,
779 now: Instant,
780 inciting_dgram_len: usize,
781 addresses: FourTuple,
782 dst_cid: ConnectionId,
783 buf: &mut Vec<u8>,
784 ) -> Option<Transmit> {
785 if self
786 .last_stateless_reset
787 .is_some_and(|last| last + self.config.min_reset_interval > now)
788 {
789 debug!("ignoring unexpected packet within minimum stateless reset interval");
790 return None;
791 }
792
793 const MIN_PADDING_LEN: usize = 5;
795
796 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
799 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
800 _ => {
801 debug!(
802 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
803 inciting_dgram_len
804 );
805 return None;
806 }
807 };
808
809 debug!(
810 "sending stateless reset for {} to {}",
811 dst_cid, addresses.remote
812 );
813 self.last_stateless_reset = Some(now);
814 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
816 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
817 max_padding_len
818 } else {
819 self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
820 };
821 buf.reserve(padding_len + RESET_TOKEN_SIZE);
822 buf.resize(padding_len, 0);
823 self.rng.fill_bytes(&mut buf[0..padding_len]);
824 buf[0] = 0b0100_0000 | (buf[0] >> 2);
825 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
826
827 debug_assert!(buf.len() < inciting_dgram_len);
828
829 Some(Transmit {
830 destination: addresses.remote,
831 ecn: None,
832 size: buf.len(),
833 segment_size: None,
834 src_ip: addresses.local_ip,
835 })
836 }
837
838 pub fn connect(
840 &mut self,
841 now: Instant,
842 config: ClientConfig,
843 remote: SocketAddr,
844 server_name: &str,
845 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
846 if self.cids_exhausted() {
847 return Err(ConnectError::CidsExhausted);
848 }
849 if remote.port() == 0 || remote.ip().is_unspecified() {
850 return Err(ConnectError::InvalidRemoteAddress(remote));
851 }
852 if !self.config.supported_versions.contains(&config.version) {
853 return Err(ConnectError::UnsupportedVersion);
854 }
855
856 let remote_id = (config.initial_dst_cid_provider)();
857 trace!(initial_dcid = %remote_id);
858
859 let ch = ConnectionHandle(self.connections.vacant_key());
860 let loc_cid = self.new_cid(ch);
861 let params = TransportParameters::new(
862 &config.transport,
863 &self.config,
864 self.local_cid_generator.as_ref(),
865 loc_cid,
866 None,
867 &mut self.rng,
868 );
869 let tls = config
870 .crypto
871 .start_session(config.version, server_name, ¶ms)?;
872
873 let conn = self.add_connection(
874 ch,
875 config.version,
876 remote_id,
877 loc_cid,
878 remote_id,
879 FourTuple {
880 remote,
881 local_ip: None,
882 },
883 now,
884 tls,
885 config.transport,
886 SideArgs::Client {
887 token_store: config.token_store,
888 server_name: server_name.into(),
889 },
890 );
891 Ok((ch, conn))
892 }
893
894 fn send_new_identifiers(
895 &mut self,
896 now: Instant,
897 ch: ConnectionHandle,
898 num: u64,
899 ) -> ConnectionEvent {
900 let mut ids = vec![];
901 for _ in 0..num {
902 let id = self.new_cid(ch);
903 let meta = &mut self.connections[ch];
904 let sequence = meta.cids_issued;
905 meta.cids_issued += 1;
906 meta.loc_cids.insert(sequence, id);
907 ids.push(IssuedCid {
908 sequence,
909 id,
910 reset_token: ResetToken::new(&*self.config.reset_key, id),
911 });
912 }
913 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
914 }
915
916 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
918 loop {
919 let cid = self.local_cid_generator.generate_cid();
920 if cid.is_empty() {
921 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
923 return cid;
924 }
925 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
926 e.insert(ch);
927 break cid;
928 }
929 }
930 }
931
932 fn handle_first_packet(
933 &mut self,
934 datagram_len: usize,
935 event: DatagramConnectionEvent,
936 addresses: FourTuple,
937 buf: &mut Vec<u8>,
938 ) -> Option<DatagramEvent> {
939 let dst_cid = event.first_decode.dst_cid();
940 let header = event.first_decode.initial_header().unwrap();
941
942 let Some(server_config) = &self.server_config else {
943 debug!("packet for unrecognized connection {}", dst_cid);
944 return self
945 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
946 .map(DatagramEvent::Response);
947 };
948
949 if datagram_len < MIN_INITIAL_SIZE as usize {
950 debug!("ignoring short initial for connection {}", dst_cid);
951 return None;
952 }
953
954 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
955 Ok(keys) => keys,
956 Err(UnsupportedVersion) => {
957 debug!(
960 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
961 header.version
962 );
963 return None;
964 }
965 };
966
967 if let Err(reason) = self.early_validate_first_packet(header) {
968 return Some(DatagramEvent::Response(self.initial_close(
969 header.version,
970 addresses,
971 &crypto,
972 &header.src_cid,
973 reason,
974 buf,
975 )));
976 }
977
978 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
979 Ok(packet) => packet,
980 Err(e) => {
981 trace!("unable to decode initial packet: {}", e);
982 return None;
983 }
984 };
985
986 if !packet.reserved_bits_valid() {
987 debug!("dropping connection attempt with invalid reserved bits");
988 return None;
989 }
990
991 let Header::Initial(header) = packet.header else {
992 panic!("non-initial packet in handle_first_packet()");
993 };
994
995 let server_config = self.server_config.as_ref().unwrap().clone();
996
997 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
998 Ok(token) => token,
999 Err(InvalidRetryTokenError) => {
1000 debug!("rejecting invalid retry token");
1001 return Some(DatagramEvent::Response(self.initial_close(
1002 header.version,
1003 addresses,
1004 &crypto,
1005 &header.src_cid,
1006 TransportError::INVALID_TOKEN(""),
1007 buf,
1008 )));
1009 }
1010 };
1011
1012 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1013 self.index
1014 .insert_initial_incoming(header.dst_cid, incoming_idx);
1015
1016 Some(DatagramEvent::NewConnection(Incoming {
1017 received_at: event.now,
1018 addresses,
1019 ecn: event.ecn,
1020 packet: InitialPacket {
1021 header,
1022 header_data: packet.header_data,
1023 payload: packet.payload,
1024 },
1025 rest: event.remaining,
1026 crypto,
1027 token,
1028 incoming_idx,
1029 improper_drop_warner: IncomingImproperDropWarner,
1030 }))
1031 }
1032
1033 #[allow(clippy::result_large_err)]
1036 pub fn accept(
1037 &mut self,
1038 mut incoming: Incoming,
1039 now: Instant,
1040 buf: &mut Vec<u8>,
1041 server_config: Option<Arc<ServerConfig>>,
1042 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1043 let remote_address_validated = incoming.remote_address_validated();
1044 incoming.improper_drop_warner.dismiss();
1045 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1046 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1047
1048 let packet_number = incoming.packet.header.number.expand(0);
1049 let InitialHeader {
1050 src_cid,
1051 dst_cid,
1052 version,
1053 ..
1054 } = incoming.packet.header;
1055 let server_config =
1056 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1057
1058 if server_config
1059 .transport
1060 .max_idle_timeout
1061 .is_some_and(|timeout| {
1062 incoming.received_at + Duration::from_millis(timeout.into()) <= now
1063 })
1064 {
1065 debug!("abandoning accept of stale initial");
1066 self.index.remove_initial(dst_cid);
1067 return Err(AcceptError {
1068 cause: ConnectionError::TimedOut,
1069 response: None,
1070 });
1071 }
1072
1073 if self.cids_exhausted() {
1074 debug!("refusing connection");
1075 self.index.remove_initial(dst_cid);
1076 return Err(AcceptError {
1077 cause: ConnectionError::CidsExhausted,
1078 response: Some(self.initial_close(
1079 version,
1080 incoming.addresses,
1081 &incoming.crypto,
1082 &src_cid,
1083 TransportError::CONNECTION_REFUSED(""),
1084 buf,
1085 )),
1086 });
1087 }
1088
1089 if incoming
1090 .crypto
1091 .packet
1092 .remote
1093 .decrypt(
1094 packet_number,
1095 &incoming.packet.header_data,
1096 &mut incoming.packet.payload,
1097 )
1098 .is_err()
1099 {
1100 debug!(packet_number, "failed to authenticate initial packet");
1101 self.index.remove_initial(dst_cid);
1102 return Err(AcceptError {
1103 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1104 response: None,
1105 });
1106 };
1107
1108 let ch = ConnectionHandle(self.connections.vacant_key());
1109 let loc_cid = self.new_cid(ch);
1110 let mut params = TransportParameters::new(
1111 &server_config.transport,
1112 &self.config,
1113 self.local_cid_generator.as_ref(),
1114 loc_cid,
1115 Some(&server_config),
1116 &mut self.rng,
1117 );
1118 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1119 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1120 params.retry_src_cid = incoming.token.retry_src_cid;
1121 let mut pref_addr_cid = None;
1122 if server_config.has_preferred_address() {
1123 let cid = self.new_cid(ch);
1124 pref_addr_cid = Some(cid);
1125 params.preferred_address = Some(PreferredAddress {
1126 address_v4: server_config.preferred_address_v4,
1127 address_v6: server_config.preferred_address_v6,
1128 connection_id: cid,
1129 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1130 });
1131 }
1132
1133 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1134 let transport_config = server_config.transport.clone();
1135 let mut conn = self.add_connection(
1136 ch,
1137 version,
1138 dst_cid,
1139 loc_cid,
1140 src_cid,
1141 incoming.addresses,
1142 incoming.received_at,
1143 tls,
1144 transport_config,
1145 SideArgs::Server {
1146 server_config,
1147 pref_addr_cid,
1148 path_validated: remote_address_validated,
1149 },
1150 );
1151 self.index.insert_initial(dst_cid, ch);
1152
1153 match conn.handle_first_packet(
1154 incoming.received_at,
1155 incoming.addresses.remote,
1156 incoming.ecn,
1157 packet_number,
1158 incoming.packet,
1159 incoming.rest,
1160 ) {
1161 Ok(()) => {
1162 trace!(id = ch.0, icid = %dst_cid, "new connection");
1163
1164 for event in incoming_buffer.datagrams {
1165 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1166 }
1167
1168 Ok((ch, conn))
1169 }
1170 Err(e) => {
1171 debug!("handshake failed: {}", e);
1172 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1173 let response = match e {
1174 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1175 version,
1176 incoming.addresses,
1177 &incoming.crypto,
1178 &src_cid,
1179 e.clone(),
1180 buf,
1181 )),
1182 _ => None,
1183 };
1184 Err(AcceptError { cause: e, response })
1185 }
1186 }
1187 }
1188
1189 fn early_validate_first_packet(
1191 &mut self,
1192 header: &ProtectedInitialHeader,
1193 ) -> Result<(), TransportError> {
1194 let config = &self.server_config.as_ref().unwrap();
1195 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1196 return Err(TransportError::CONNECTION_REFUSED(""));
1197 }
1198
1199 if header.dst_cid.len() < 8
1204 && (header.token_pos.is_empty()
1205 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1206 {
1207 debug!(
1208 "rejecting connection due to invalid DCID length {}",
1209 header.dst_cid.len()
1210 );
1211 return Err(TransportError::PROTOCOL_VIOLATION(
1212 "invalid destination CID length",
1213 ));
1214 }
1215
1216 Ok(())
1217 }
1218
1219 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1221 self.clean_up_incoming(&incoming);
1222 incoming.improper_drop_warner.dismiss();
1223
1224 self.initial_close(
1225 incoming.packet.header.version,
1226 incoming.addresses,
1227 &incoming.crypto,
1228 &incoming.packet.header.src_cid,
1229 TransportError::CONNECTION_REFUSED(""),
1230 buf,
1231 )
1232 }
1233
1234 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1238 if !incoming.may_retry() {
1239 return Err(RetryError(Box::new(incoming)));
1240 }
1241
1242 self.clean_up_incoming(&incoming);
1243 incoming.improper_drop_warner.dismiss();
1244
1245 let server_config = self.server_config.as_ref().unwrap();
1246
1247 let loc_cid = self.local_cid_generator.generate_cid();
1254
1255 let payload = TokenPayload::Retry {
1256 address: incoming.addresses.remote,
1257 orig_dst_cid: incoming.packet.header.dst_cid,
1258 issued: server_config.time_source.now(),
1259 };
1260 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1261
1262 let header = Header::Retry {
1263 src_cid: loc_cid,
1264 dst_cid: incoming.packet.header.src_cid,
1265 version: incoming.packet.header.version,
1266 };
1267
1268 let encode = header.encode(buf);
1269 buf.put_slice(&token);
1270 buf.extend_from_slice(&server_config.crypto.retry_tag(
1271 incoming.packet.header.version,
1272 &incoming.packet.header.dst_cid,
1273 buf,
1274 ));
1275 encode.finish(buf, &*incoming.crypto.header.local, None);
1276
1277 Ok(Transmit {
1278 destination: incoming.addresses.remote,
1279 ecn: None,
1280 size: buf.len(),
1281 segment_size: None,
1282 src_ip: incoming.addresses.local_ip,
1283 })
1284 }
1285
1286 pub fn ignore(&mut self, incoming: Incoming) {
1291 self.clean_up_incoming(&incoming);
1292 incoming.improper_drop_warner.dismiss();
1293 }
1294
1295 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1297 self.index.remove_initial(incoming.packet.header.dst_cid);
1298 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1299 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1300 }
1301
1302 fn add_connection(
1303 &mut self,
1304 ch: ConnectionHandle,
1305 version: u32,
1306 init_cid: ConnectionId,
1307 loc_cid: ConnectionId,
1308 rem_cid: ConnectionId,
1309 addresses: FourTuple,
1310 now: Instant,
1311 tls: Box<dyn crypto::Session>,
1312 transport_config: Arc<TransportConfig>,
1313 side_args: SideArgs,
1314 ) -> Connection {
1315 let mut rng_seed = [0; 32];
1316 self.rng.fill_bytes(&mut rng_seed);
1317 let side = side_args.side();
1318 let pref_addr_cid = side_args.pref_addr_cid();
1319 let conn = Connection::new(
1320 self.config.clone(),
1321 transport_config,
1322 init_cid,
1323 loc_cid,
1324 rem_cid,
1325 addresses.remote,
1326 addresses.local_ip,
1327 tls,
1328 self.local_cid_generator.as_ref(),
1329 now,
1330 version,
1331 self.allow_mtud,
1332 rng_seed,
1333 side_args,
1334 );
1335
1336 let mut cids_issued = 0;
1337 let mut loc_cids = FxHashMap::default();
1338
1339 loc_cids.insert(cids_issued, loc_cid);
1340 cids_issued += 1;
1341
1342 if let Some(cid) = pref_addr_cid {
1343 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1344 loc_cids.insert(cids_issued, cid);
1345 cids_issued += 1;
1346 }
1347
1348 let id = self.connections.insert(ConnectionMeta {
1349 init_cid,
1350 cids_issued,
1351 loc_cids,
1352 addresses,
1353 side,
1354 reset_token: None,
1355 peer_id: None,
1356 });
1357 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1358
1359 self.index.insert_conn(addresses, loc_cid, ch, side);
1360
1361 conn
1362 }
1363
1364 fn initial_close(
1365 &mut self,
1366 version: u32,
1367 addresses: FourTuple,
1368 crypto: &Keys,
1369 remote_id: &ConnectionId,
1370 reason: TransportError,
1371 buf: &mut Vec<u8>,
1372 ) -> Transmit {
1373 let local_id = self.local_cid_generator.generate_cid();
1377 let number = PacketNumber::U8(0);
1378 let header = Header::Initial(InitialHeader {
1379 dst_cid: *remote_id,
1380 src_cid: local_id,
1381 number,
1382 token: Bytes::new(),
1383 version,
1384 });
1385
1386 let partial_encode = header.encode(buf);
1387 let max_len =
1388 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1389 frame::Close::from(reason).encode(buf, max_len);
1390 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1391 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1392 Transmit {
1393 destination: addresses.remote,
1394 ecn: None,
1395 size: buf.len(),
1396 segment_size: None,
1397 src_ip: addresses.local_ip,
1398 }
1399 }
1400
1401 pub fn config(&self) -> &EndpointConfig {
1403 &self.config
1404 }
1405
1406 pub fn enable_address_discovery(&mut self, enabled: bool) {
1413 self.address_discovery_enabled = enabled;
1414 }
1417
1418 pub fn address_discovery_enabled(&self) -> bool {
1420 self.address_discovery_enabled
1421 }
1422
1423 pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1431 Vec::new()
1433 }
1434
1435 pub fn set_address_change_callback<F>(&mut self, callback: F)
1440 where
1441 F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1442 {
1443 self.address_change_callback = Some(Box::new(callback));
1444 }
1445
1446 pub fn clear_address_change_callback(&mut self) {
1448 self.address_change_callback = None;
1449 }
1450
1451 pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1456 AddressDiscoveryStats::default()
1458 }
1459
1460 pub fn open_connections(&self) -> usize {
1462 self.connections.len()
1463 }
1464
1465 pub fn incoming_buffer_bytes(&self) -> u64 {
1468 self.all_incoming_buffers_total_bytes
1469 }
1470
1471 #[cfg(test)]
1472 pub(crate) fn known_connections(&self) -> usize {
1473 let x = self.connections.len();
1474 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1475 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1477 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1479 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1480 x
1481 }
1482
1483 #[cfg(test)]
1484 pub(crate) fn known_cids(&self) -> usize {
1485 self.index.connection_ids.len()
1486 }
1487
1488 fn cids_exhausted(&self) -> bool {
1493 self.local_cid_generator.cid_len() <= 4
1494 && self.local_cid_generator.cid_len() != 0
1495 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1496 - self.index.connection_ids.len())
1497 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1498 }
1499}
1500
1501impl fmt::Debug for Endpoint {
1502 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1503 fmt.debug_struct("Endpoint")
1504 .field("rng", &self.rng)
1505 .field("index", &self.index)
1506 .field("connections", &self.connections)
1507 .field("config", &self.config)
1508 .field("server_config", &self.server_config)
1509 .field("incoming_buffers.len", &self.incoming_buffers.len())
1511 .field(
1512 "all_incoming_buffers_total_bytes",
1513 &self.all_incoming_buffers_total_bytes,
1514 )
1515 .finish()
1516 }
1517}
1518
1519#[derive(Default)]
1521struct IncomingBuffer {
1522 datagrams: Vec<DatagramConnectionEvent>,
1523 total_bytes: u64,
1524}
1525
1526#[derive(Copy, Clone, Debug)]
1528enum RouteDatagramTo {
1529 Incoming(usize),
1530 Connection(ConnectionHandle),
1531}
1532
1533#[derive(Default, Debug)]
1535struct ConnectionIndex {
1536 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1542 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1546 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1550 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1559 connection_reset_tokens: ResetTokenTable,
1564}
1565
1566impl ConnectionIndex {
1567 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1569 if dst_cid.is_empty() {
1570 return;
1571 }
1572 self.connection_ids_initial
1573 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1574 }
1575
1576 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1578 if dst_cid.is_empty() {
1579 return;
1580 }
1581 let removed = self.connection_ids_initial.remove(&dst_cid);
1582 debug_assert!(removed.is_some());
1583 }
1584
1585 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1587 if dst_cid.is_empty() {
1588 return;
1589 }
1590 self.connection_ids_initial
1591 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1592 }
1593
1594 fn insert_conn(
1597 &mut self,
1598 addresses: FourTuple,
1599 dst_cid: ConnectionId,
1600 connection: ConnectionHandle,
1601 side: Side,
1602 ) {
1603 match dst_cid.len() {
1604 0 => match side {
1605 Side::Server => {
1606 self.incoming_connection_remotes
1607 .insert(addresses, connection);
1608 }
1609 Side::Client => {
1610 self.outgoing_connection_remotes
1611 .insert(addresses.remote, connection);
1612 }
1613 },
1614 _ => {
1615 self.connection_ids.insert(dst_cid, connection);
1616 }
1617 }
1618 }
1619
1620 fn retire(&mut self, dst_cid: ConnectionId) {
1622 self.connection_ids.remove(&dst_cid);
1623 }
1624
1625 fn remove(&mut self, conn: &ConnectionMeta) {
1627 if conn.side.is_server() {
1628 self.remove_initial(conn.init_cid);
1629 }
1630 for cid in conn.loc_cids.values() {
1631 self.connection_ids.remove(cid);
1632 }
1633 self.incoming_connection_remotes.remove(&conn.addresses);
1634 self.outgoing_connection_remotes
1635 .remove(&conn.addresses.remote);
1636 if let Some((remote, token)) = conn.reset_token {
1637 self.connection_reset_tokens.remove(remote, token);
1638 }
1639 }
1640
1641 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1643 let dst_cid = datagram.dst_cid();
1644 let is_empty_cid = dst_cid.is_empty();
1645
1646 if !is_empty_cid {
1648 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1649 return Some(RouteDatagramTo::Connection(ch));
1650 }
1651 }
1652
1653 if datagram.is_initial() || datagram.is_0rtt() {
1655 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1656 return Some(ch);
1657 }
1658 }
1659
1660 if is_empty_cid {
1662 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1664 return Some(RouteDatagramTo::Connection(ch));
1665 }
1666 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1667 return Some(RouteDatagramTo::Connection(ch));
1668 }
1669 }
1670
1671 let data = datagram.data();
1673 if data.len() < RESET_TOKEN_SIZE {
1674 return None;
1675 }
1676 self.connection_reset_tokens
1677 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1678 .cloned()
1679 .map(RouteDatagramTo::Connection)
1680 }
1681}
1682
1683#[derive(Debug)]
1684pub(crate) struct ConnectionMeta {
1685 init_cid: ConnectionId,
1686 cids_issued: u64,
1688 loc_cids: FxHashMap<u64, ConnectionId>,
1689 addresses: FourTuple,
1694 side: Side,
1695 reset_token: Option<(SocketAddr, ResetToken)>,
1698 peer_id: Option<PeerId>,
1700}
1701
1702#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1704pub struct ConnectionHandle(pub usize);
1705
1706impl From<ConnectionHandle> for usize {
1707 fn from(x: ConnectionHandle) -> Self {
1708 x.0
1709 }
1710}
1711
1712impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1713 type Output = ConnectionMeta;
1714 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1715 &self[ch.0]
1716 }
1717}
1718
1719impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1720 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1721 &mut self[ch.0]
1722 }
1723}
1724
1725pub enum DatagramEvent {
1727 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1729 NewConnection(Incoming),
1731 Response(Transmit),
1733}
1734
1735pub struct Incoming {
1737 received_at: Instant,
1738 addresses: FourTuple,
1739 ecn: Option<EcnCodepoint>,
1740 packet: InitialPacket,
1741 rest: Option<BytesMut>,
1742 crypto: Keys,
1743 token: IncomingToken,
1744 incoming_idx: usize,
1745 improper_drop_warner: IncomingImproperDropWarner,
1746}
1747
1748impl Incoming {
1749 pub fn local_ip(&self) -> Option<IpAddr> {
1753 self.addresses.local_ip
1754 }
1755
1756 pub fn remote_address(&self) -> SocketAddr {
1758 self.addresses.remote
1759 }
1760
1761 pub fn remote_address_validated(&self) -> bool {
1769 self.token.validated
1770 }
1771
1772 pub fn may_retry(&self) -> bool {
1777 self.token.retry_src_cid.is_none()
1778 }
1779
1780 pub fn orig_dst_cid(&self) -> &ConnectionId {
1782 &self.token.orig_dst_cid
1783 }
1784}
1785
1786impl fmt::Debug for Incoming {
1787 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1788 f.debug_struct("Incoming")
1789 .field("addresses", &self.addresses)
1790 .field("ecn", &self.ecn)
1791 .field("token", &self.token)
1794 .field("incoming_idx", &self.incoming_idx)
1795 .finish_non_exhaustive()
1797 }
1798}
1799
1800struct IncomingImproperDropWarner;
1801
1802impl IncomingImproperDropWarner {
1803 fn dismiss(self) {
1804 mem::forget(self);
1805 }
1806}
1807
1808impl Drop for IncomingImproperDropWarner {
1809 fn drop(&mut self) {
1810 warn!(
1811 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1812 (may cause memory leak and eventual inability to accept new connections)"
1813 );
1814 }
1815}
1816
1817#[derive(Debug, Error, Clone, PartialEq, Eq)]
1821pub enum ConnectError {
1822 #[error("endpoint stopping")]
1826 EndpointStopping,
1827 #[error("CIDs exhausted")]
1831 CidsExhausted,
1832 #[error("invalid server name: {0}")]
1834 InvalidServerName(String),
1835 #[error("invalid remote address: {0}")]
1839 InvalidRemoteAddress(SocketAddr),
1840 #[error("no default client config")]
1844 NoDefaultClientConfig,
1845 #[error("unsupported QUIC version")]
1847 UnsupportedVersion,
1848}
1849
1850#[derive(Debug)]
1852pub struct AcceptError {
1853 pub cause: ConnectionError,
1855 pub response: Option<Transmit>,
1857}
1858
1859#[derive(Debug, Error)]
1861#[error("retry() with validated Incoming")]
1862pub struct RetryError(Box<Incoming>);
1863
1864impl RetryError {
1865 pub fn into_incoming(self) -> Incoming {
1867 *self.0
1868 }
1869}
1870
1871#[derive(Default, Debug)]
1876struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1877
1878impl ResetTokenTable {
1879 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1880 self.0
1881 .entry(remote)
1882 .or_default()
1883 .insert(token, ch)
1884 .is_some()
1885 }
1886
1887 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1888 use std::collections::hash_map::Entry;
1889 match self.0.entry(remote) {
1890 Entry::Vacant(_) => {}
1891 Entry::Occupied(mut e) => {
1892 e.get_mut().remove(&token);
1893 if e.get().is_empty() {
1894 e.remove_entry();
1895 }
1896 }
1897 }
1898 }
1899
1900 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1901 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1902 self.0.get(&remote)?.get(&token)
1903 }
1904}
1905
1906#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1911struct FourTuple {
1912 remote: SocketAddr,
1913 local_ip: Option<IpAddr>,
1915}