1use std::{
9 collections::{HashMap, VecDeque, hash_map},
10 convert::TryFrom,
11 fmt, mem,
12 net::{IpAddr, SocketAddr},
13 ops::{Index, IndexMut},
14 sync::Arc,
15};
16
17use indexmap::IndexMap;
18
19use bytes::{BufMut, Bytes, BytesMut};
20use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
21use rustc_hash::FxHashMap;
22use rustls;
23use slab::Slab;
24use thiserror::Error;
25use tracing::{debug, error, trace, warn};
26
27use crate::{
28 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
29 Side, Transmit, TransportConfig, TransportError,
30 cid_generator::ConnectionIdGenerator,
31 coding::BufMutExt,
32 config::{ClientConfig, EndpointConfig, ServerConfig},
33 connection::{Connection, ConnectionError, SideArgs},
34 crypto::{self, Keys, UnsupportedVersion},
35 frame,
36 nat_traversal_api::PeerId,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
39 PacketNumber, PartialDecode, ProtectedInitialHeader,
40 },
41 relay::RelayStatisticsCollector,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner, IssuedCid,
45 },
46 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
47 transport_parameters::{PreferredAddress, TransportParameters},
48};
49
50#[derive(Debug, Clone)]
52struct RelayQueueItem {
53 target_peer_id: PeerId,
55 frame: frame::PunchMeNow,
57 created_at: Instant,
59 attempts: u32,
61 last_attempt: Option<Instant>,
63}
64
65#[derive(Debug)]
67struct RelayQueue {
68 pending: IndexMap<u64, RelayQueueItem>,
70 next_seq: u64,
72 max_queue_size: usize,
74 request_timeout: Duration,
76 max_retries: u32,
78 retry_interval: Duration,
80 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
82 max_relays_per_peer: usize,
84 rate_limit_window: Duration,
86}
87
88#[derive(Debug, Default, Clone)]
90pub struct AddressDiscoveryStats {
91 pub frames_sent: u64,
93 pub frames_received: u64,
95 pub addresses_discovered: u64,
97 pub address_changes_detected: u64,
99}
100
101#[derive(Debug, Default, Clone)]
103pub struct RelayStats {
104 pub requests_received: u64,
106 pub requests_relayed: u64,
108 pub requests_failed: u64,
110 pub requests_dropped: u64,
112 pub requests_timed_out: u64,
114 pub requests_rate_limited: u64,
116 pub current_queue_size: usize,
118}
119
120impl RelayQueue {
121 fn new() -> Self {
123 Self {
124 pending: IndexMap::new(),
125 next_seq: 0,
126 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
129 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
131 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
134 }
135
136 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
138 if self.pending.len() >= self.max_queue_size {
140 warn!(
141 "Relay queue full, dropping request for peer {:?}",
142 target_peer_id
143 );
144 return false;
145 }
146
147 if !self.check_rate_limit(target_peer_id, now) {
149 warn!(
150 "Rate limit exceeded for peer {:?}, dropping relay request",
151 target_peer_id
152 );
153 return false;
154 }
155
156 let item = RelayQueueItem {
157 target_peer_id,
158 frame,
159 created_at: now,
160 attempts: 0,
161 last_attempt: None,
162 };
163
164 let seq = self.next_seq;
165 self.next_seq += 1;
166 self.pending.insert(seq, item);
167
168 self.record_relay_request(target_peer_id, now);
170
171 trace!(
172 "Queued relay request for peer {:?}, queue size: {}",
173 target_peer_id,
174 self.pending.len()
175 );
176 true
177 }
178
179 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
181 self.cleanup_rate_limiter(now);
183
184 if let Some(requests) = self.rate_limiter.get(&peer_id) {
186 requests.len() < self.max_relays_per_peer
187 } else {
188 true }
190 }
191
192 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
194 self.rate_limiter.entry(peer_id).or_default().push_back(now);
195 }
196
197 fn cleanup_rate_limiter(&mut self, now: Instant) {
199 self.rate_limiter.retain(|_, requests| {
200 requests.retain(|&request_time| {
201 now.saturating_duration_since(request_time) <= self.rate_limit_window
202 });
203 !requests.is_empty()
204 });
205 }
206
207 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
209 let mut expired_keys = Vec::new();
211 let mut ready_key = None;
212
213 for (seq, item) in &self.pending {
214 if now.saturating_duration_since(item.created_at) > self.request_timeout {
216 expired_keys.push(*seq);
217 continue;
218 }
219
220 if item.attempts == 0
222 || item
223 .last_attempt
224 .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
225 {
226 ready_key = Some(*seq);
227 break;
228 }
229 }
230
231 for key in expired_keys {
233 if let Some(expired) = self.pending.shift_remove(&key) {
234 debug!(
235 "Relay request for peer {:?} timed out after {:?}",
236 expired.target_peer_id,
237 now.saturating_duration_since(expired.created_at)
238 );
239 }
240 }
241
242 if let Some(key) = ready_key {
244 if let Some(mut item) = self.pending.shift_remove(&key) {
245 item.attempts += 1;
246 item.last_attempt = Some(now);
247 return Some(item);
248 }
249 }
250
251 None
252 }
253
254 fn requeue_failed(&mut self, item: RelayQueueItem) {
256 if item.attempts < self.max_retries {
257 trace!(
258 "Requeuing failed relay request for peer {:?}, attempt {}/{}",
259 item.target_peer_id, item.attempts, self.max_retries
260 );
261 let seq = self.next_seq;
262 self.next_seq += 1;
263 self.pending.insert(seq, item);
264 } else {
265 debug!(
266 "Dropping relay request for peer {:?} after {} failed attempts",
267 item.target_peer_id, item.attempts
268 );
269 }
270 }
271
272 fn cleanup_expired(&mut self, now: Instant) -> usize {
274 let initial_len = self.pending.len();
275
276 let expired_keys: Vec<u64> = self
278 .pending
279 .iter()
280 .filter_map(|(seq, item)| {
281 if now.saturating_duration_since(item.created_at) > self.request_timeout {
282 Some(*seq)
283 } else {
284 None
285 }
286 })
287 .collect();
288
289 for key in expired_keys {
291 if let Some(expired) = self.pending.shift_remove(&key) {
292 debug!(
293 "Removing expired relay request for peer {:?}",
294 expired.target_peer_id
295 );
296 }
297 }
298
299 initial_len - self.pending.len()
300 }
301
302 fn len(&self) -> usize {
304 self.pending.len()
305 }
306}
307
308pub struct Endpoint {
313 rng: StdRng,
314 index: ConnectionIndex,
315 connections: Slab<ConnectionMeta>,
316 local_cid_generator: Box<dyn ConnectionIdGenerator>,
317 config: Arc<EndpointConfig>,
318 server_config: Option<Arc<ServerConfig>>,
319 allow_mtud: bool,
321 last_stateless_reset: Option<Instant>,
323 incoming_buffers: Slab<IncomingBuffer>,
325 all_incoming_buffers_total_bytes: u64,
326 peer_connections: HashMap<PeerId, ConnectionHandle>,
328 relay_queue: RelayQueue,
330 relay_stats: RelayStats,
332 relay_stats_collector: RelayStatisticsCollector,
334 address_discovery_enabled: bool,
336 address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
338}
339
340impl Endpoint {
341 pub fn new(
352 config: Arc<EndpointConfig>,
353 server_config: Option<Arc<ServerConfig>>,
354 allow_mtud: bool,
355 rng_seed: Option<[u8; 32]>,
356 ) -> Self {
357 let rng_seed = rng_seed.or(config.rng_seed);
358 Self {
359 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
360 index: ConnectionIndex::default(),
361 connections: Slab::new(),
362 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
363 config,
364 server_config,
365 allow_mtud,
366 last_stateless_reset: None,
367 incoming_buffers: Slab::new(),
368 all_incoming_buffers_total_bytes: 0,
369 peer_connections: HashMap::new(),
370 relay_queue: RelayQueue::new(),
371 relay_stats: RelayStats::default(),
372 relay_stats_collector: RelayStatisticsCollector::new(),
373 address_discovery_enabled: true, address_change_callback: None,
375 }
376 }
377
378 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
380 self.server_config = server_config;
381 }
382
383 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
385 self.peer_connections.insert(peer_id, connection_handle);
386 trace!(
387 "Registered peer {:?} with connection {:?}",
388 peer_id, connection_handle
389 );
390 }
391
392 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
394 if let Some(handle) = self.peer_connections.remove(peer_id) {
395 trace!(
396 "Unregistered peer {:?} from connection {:?}",
397 peer_id, handle
398 );
399 }
400 }
401
402 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
404 self.peer_connections.get(peer_id).copied()
405 }
406
407 pub(crate) fn queue_frame_for_peer(
409 &mut self,
410 peer_id: &PeerId,
411 frame: frame::PunchMeNow,
412 ) -> bool {
413 self.relay_stats.requests_received += 1;
414
415 if let Some(ch) = self.lookup_peer_connection(peer_id) {
416 if self.relay_frame_to_connection(ch, frame.clone()) {
418 self.relay_stats.requests_relayed += 1;
419 self.relay_stats_collector.record_rate_limit(true);
421 trace!(
422 "Immediately relayed frame to peer {:?} via connection {:?}",
423 peer_id, ch
424 );
425 return true;
426 }
427 }
428
429 let now = Instant::now();
431 if self.relay_queue.enqueue(*peer_id, frame, now) {
432 self.relay_stats.current_queue_size = self.relay_queue.len();
433 self.relay_stats_collector.record_rate_limit(true);
435 trace!("Queued relay request for peer {:?}", peer_id);
436 true
437 } else {
438 if !self.relay_queue.check_rate_limit(*peer_id, now) {
440 self.relay_stats.requests_rate_limited += 1;
441 self.relay_stats_collector.record_rate_limit(false);
443 self.relay_stats_collector.record_error("rate_limited");
445 } else {
446 self.relay_stats.requests_dropped += 1;
447 self.relay_stats_collector
449 .record_error("resource_exhausted");
450 }
451 false
452 }
453 }
454
455 fn relay_frame_to_connection(
457 &mut self,
458 ch: ConnectionHandle,
459 frame: frame::PunchMeNow,
460 ) -> bool {
461 let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
463 if let Some(_conn) = self.connections.get_mut(ch.0) {
464 }
471 true
475 }
476
477 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
479 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
480 connection.peer_id = Some(peer_id);
481 self.register_peer(peer_id, connection_handle);
482
483 self.process_queued_relays_for_peer(peer_id);
485 }
486 }
487
488 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
490 let _now = Instant::now();
491 let mut processed = 0;
492
493 let mut items_to_process = Vec::new();
495 let mut keys_to_remove = Vec::new();
496
497 for (seq, item) in &self.relay_queue.pending {
499 if item.target_peer_id == peer_id {
500 items_to_process.push(item.clone());
501 keys_to_remove.push(*seq);
502 }
503 }
504
505 for key in keys_to_remove {
507 self.relay_queue.pending.shift_remove(&key);
508 }
509
510 for item in items_to_process {
512 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
513 if self.relay_frame_to_connection(ch, item.frame.clone()) {
514 self.relay_stats.requests_relayed += 1;
515 processed += 1;
516 trace!("Processed queued relay for peer {:?}", peer_id);
517 } else {
518 self.relay_queue.requeue_failed(item);
520 self.relay_stats.requests_failed += 1;
521 }
522 }
523 }
524
525 self.relay_stats.current_queue_size = self.relay_queue.len();
526
527 if processed > 0 {
528 debug!(
529 "Processed {} queued relay requests for peer {:?}",
530 processed, peer_id
531 );
532 }
533 }
534
535 pub fn process_relay_queue(&mut self) {
537 let now = Instant::now();
538 let mut processed = 0;
539 let mut failed = 0;
540
541 while let Some(item) = self.relay_queue.next_ready(now) {
543 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
544 if self.relay_frame_to_connection(ch, item.frame.clone()) {
545 self.relay_stats.requests_relayed += 1;
546 processed += 1;
547 trace!(
548 "Successfully relayed frame to peer {:?}",
549 item.target_peer_id
550 );
551 } else {
552 self.relay_queue.requeue_failed(item);
554 self.relay_stats.requests_failed += 1;
555 self.relay_stats_collector
557 .record_error("connection_failure");
558 failed += 1;
559 }
560 } else {
561 self.relay_queue.requeue_failed(item);
563 self.relay_stats_collector.record_error("peer_not_found");
565 failed += 1;
566 }
567 }
568
569 let expired = self.relay_queue.cleanup_expired(now);
571 if expired > 0 {
572 self.relay_stats.requests_timed_out += expired as u64;
573 for _ in 0..expired {
575 self.relay_stats_collector.record_error("request_timeout");
576 }
577 debug!("Cleaned up {} expired relay requests", expired);
578 }
579
580 self.relay_stats.current_queue_size = self.relay_queue.len();
581
582 if processed > 0 || failed > 0 {
583 trace!(
584 "Relay queue processing: {} processed, {} failed, {} in queue",
585 processed,
586 failed,
587 self.relay_queue.len()
588 );
589 }
590 }
591
592 pub fn relay_stats(&self) -> &RelayStats {
594 &self.relay_stats
595 }
596
597 pub fn comprehensive_relay_stats(&self) -> crate::relay::RelayStatistics {
599 self.relay_stats_collector
601 .update_queue_stats(&self.relay_stats);
602 self.relay_stats_collector.collect_statistics()
603 }
604
605 pub fn relay_stats_collector(&self) -> &RelayStatisticsCollector {
607 &self.relay_stats_collector
608 }
609
610 pub fn relay_queue_len(&self) -> usize {
612 self.relay_queue.len()
613 }
614
615 pub fn handle_event(
619 &mut self,
620 ch: ConnectionHandle,
621 event: EndpointEvent,
622 ) -> Option<ConnectionEvent> {
623 use EndpointEventInner::*;
624 match event.0 {
625 EndpointEventInner::NeedIdentifiers(now, n) => {
626 return Some(self.send_new_identifiers(now, ch, n));
627 }
628 ResetToken(remote, token) => {
629 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
630 self.index.connection_reset_tokens.remove(old.0, old.1);
631 }
632 if self.index.connection_reset_tokens.insert(remote, token, ch) {
633 warn!("duplicate reset token");
634 }
635 }
636 RetireConnectionId(now, seq, allow_more_cids) => {
637 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
638 trace!("peer retired CID {}: {}", seq, cid);
639 self.index.retire(cid);
640 if allow_more_cids {
641 return Some(self.send_new_identifiers(now, ch, 1));
642 }
643 }
644 }
645 RelayPunchMeNow(target_peer_id, punch_me_now) => {
646 let peer_id = PeerId(target_peer_id);
648 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
649 trace!(
650 "Successfully queued PunchMeNow frame for relay to peer {:?}",
651 peer_id
652 );
653 } else {
654 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
655 }
656 }
657 SendAddressFrame(add_address_frame) => {
658 return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
660 add_address_frame,
661 )));
662 }
663 NatCandidateValidated { address, challenge } => {
664 trace!(
666 "NAT candidate validation succeeded for {} with challenge {:016x}",
667 address, challenge
668 );
669
670 debug!("NAT candidate {} validated successfully", address);
674 }
675 TryConnectTo {
676 request_id,
677 target_address,
678 timeout_ms,
679 requester_connection,
680 requested_at: _,
681 } => {
682 trace!(
686 "TryConnectTo request received: request_id={}, target={}, timeout={}ms, from={}",
687 request_id, target_address, timeout_ms, requester_connection
688 );
689
690 debug!(
695 "TryConnectTo: endpoint received callback request for {}",
696 target_address
697 );
698
699 }
703 Drained => {
704 if let Some(conn) = self.connections.try_remove(ch.0) {
705 self.index.remove(&conn);
706 if let Some(peer_id) = conn.peer_id {
708 self.peer_connections.remove(&peer_id);
709 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
710 }
711 } else {
712 error!(id = ch.0, "unknown connection drained");
716 }
717 }
718 }
719 None
720 }
721
722 pub fn handle(
724 &mut self,
725 now: Instant,
726 remote: SocketAddr,
727 local_ip: Option<IpAddr>,
728 ecn: Option<EcnCodepoint>,
729 data: BytesMut,
730 buf: &mut Vec<u8>,
731 ) -> Option<DatagramEvent> {
732 let datagram_len = data.len();
734 let event = match PartialDecode::new(
735 data,
736 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
737 &self.config.supported_versions,
738 self.config.grease_quic_bit,
739 ) {
740 Ok((first_decode, remaining)) => DatagramConnectionEvent {
741 now,
742 remote,
743 ecn,
744 first_decode,
745 remaining,
746 },
747 Err(PacketDecodeError::UnsupportedVersion {
748 src_cid,
749 dst_cid,
750 version,
751 }) => {
752 if self.server_config.is_none() {
753 debug!("dropping packet with unsupported version");
754 return None;
755 }
756 trace!("sending version negotiation");
757 Header::VersionNegotiate {
759 random: self.rng.r#gen::<u8>() | 0x40,
760 src_cid: dst_cid,
761 dst_cid: src_cid,
762 }
763 .encode(buf);
764 buf.write::<u32>(match version {
766 0x0a1a_2a3a => 0x0a1a_2a4a,
767 _ => 0x0a1a_2a3a,
768 });
769 for &version in &self.config.supported_versions {
770 buf.write(version);
771 }
772 return Some(DatagramEvent::Response(Transmit {
773 destination: remote,
774 ecn: None,
775 size: buf.len(),
776 segment_size: None,
777 src_ip: local_ip,
778 }));
779 }
780 Err(e) => {
781 trace!("malformed header: {}", e);
782 return None;
783 }
784 };
785
786 let addresses = FourTuple { remote, local_ip };
787 let dst_cid = event.first_decode.dst_cid();
788
789 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
790 match route_to {
792 RouteDatagramTo::Incoming(incoming_idx) => {
793 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
794 let Some(config) = &self.server_config else {
795 debug!("no server config available to buffer incoming datagram");
796 return None;
797 };
798
799 if incoming_buffer
800 .total_bytes
801 .checked_add(datagram_len as u64)
802 .is_some_and(|n| n <= config.incoming_buffer_size)
803 && self
804 .all_incoming_buffers_total_bytes
805 .checked_add(datagram_len as u64)
806 .is_some_and(|n| n <= config.incoming_buffer_size_total)
807 {
808 incoming_buffer.datagrams.push(event);
809 incoming_buffer.total_bytes += datagram_len as u64;
810 self.all_incoming_buffers_total_bytes += datagram_len as u64;
811 }
812
813 None
814 }
815 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
816 ch,
817 ConnectionEvent(ConnectionEventInner::Datagram(event)),
818 )),
819 }
820 } else if event.first_decode.initial_header().is_some() {
821 self.handle_first_packet(datagram_len, event, addresses, buf)
824 } else if event.first_decode.has_long_header() {
825 debug!(
826 "ignoring non-initial packet for unknown connection {}",
827 dst_cid
828 );
829 None
830 } else if !event.first_decode.is_initial()
831 && self.local_cid_generator.validate(dst_cid).is_err()
832 {
833 debug!("dropping packet with invalid CID");
837 None
838 } else if dst_cid.is_empty() {
839 trace!("dropping unrecognized short packet without ID");
840 None
841 } else {
842 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
843 .map(DatagramEvent::Response)
844 }
845 }
846
847 fn stateless_reset(
848 &mut self,
849 now: Instant,
850 inciting_dgram_len: usize,
851 addresses: FourTuple,
852 dst_cid: ConnectionId,
853 buf: &mut Vec<u8>,
854 ) -> Option<Transmit> {
855 if self
856 .last_stateless_reset
857 .is_some_and(|last| last + self.config.min_reset_interval > now)
858 {
859 debug!("ignoring unexpected packet within minimum stateless reset interval");
860 return None;
861 }
862
863 const MIN_PADDING_LEN: usize = 5;
865
866 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
869 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
870 _ => {
871 debug!(
872 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
873 inciting_dgram_len
874 );
875 return None;
876 }
877 };
878
879 debug!(
880 "sending stateless reset for {} to {}",
881 dst_cid, addresses.remote
882 );
883 self.last_stateless_reset = Some(now);
884 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
886 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
887 max_padding_len
888 } else {
889 self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
890 };
891 buf.reserve(padding_len + RESET_TOKEN_SIZE);
892 buf.resize(padding_len, 0);
893 self.rng.fill_bytes(&mut buf[0..padding_len]);
894 buf[0] = 0b0100_0000 | (buf[0] >> 2);
895 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
896
897 debug_assert!(buf.len() < inciting_dgram_len);
898
899 Some(Transmit {
900 destination: addresses.remote,
901 ecn: None,
902 size: buf.len(),
903 segment_size: None,
904 src_ip: addresses.local_ip,
905 })
906 }
907
908 pub fn connect(
910 &mut self,
911 now: Instant,
912 config: ClientConfig,
913 remote: SocketAddr,
914 server_name: &str,
915 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
916 if self.cids_exhausted() {
917 return Err(ConnectError::CidsExhausted);
918 }
919 if remote.port() == 0 || remote.ip().is_unspecified() {
920 return Err(ConnectError::InvalidRemoteAddress(remote));
921 }
922 if !self.config.supported_versions.contains(&config.version) {
923 return Err(ConnectError::UnsupportedVersion);
924 }
925
926 let remote_id = (config.initial_dst_cid_provider)();
927 trace!(initial_dcid = %remote_id);
928
929 let ch = ConnectionHandle(self.connections.vacant_key());
930 let loc_cid = self.new_cid(ch);
931 let params = TransportParameters::new(
932 &config.transport,
933 &self.config,
934 self.local_cid_generator.as_ref(),
935 loc_cid,
936 None,
937 &mut self.rng,
938 )?;
939 let tls = config
940 .crypto
941 .start_session(config.version, server_name, ¶ms)?;
942
943 let conn = self.add_connection(
944 ch,
945 config.version,
946 remote_id,
947 loc_cid,
948 remote_id,
949 FourTuple {
950 remote,
951 local_ip: None,
952 },
953 now,
954 tls,
955 config.transport,
956 SideArgs::Client {
957 token_store: config.token_store,
958 server_name: server_name.into(),
959 },
960 );
961 Ok((ch, conn))
962 }
963
964 fn send_new_identifiers(
965 &mut self,
966 now: Instant,
967 ch: ConnectionHandle,
968 num: u64,
969 ) -> ConnectionEvent {
970 let mut ids = vec![];
971 for _ in 0..num {
972 let id = self.new_cid(ch);
973 let meta = &mut self.connections[ch];
974 let sequence = meta.cids_issued;
975 meta.cids_issued += 1;
976 meta.loc_cids.insert(sequence, id);
977 ids.push(IssuedCid {
978 sequence,
979 id,
980 reset_token: ResetToken::new(&*self.config.reset_key, id),
981 });
982 }
983 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
984 }
985
986 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
988 loop {
989 let cid = self.local_cid_generator.generate_cid();
990 if cid.is_empty() {
991 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
993 return cid;
994 }
995 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
996 e.insert(ch);
997 break cid;
998 }
999 }
1000 }
1001
1002 fn handle_first_packet(
1003 &mut self,
1004 datagram_len: usize,
1005 event: DatagramConnectionEvent,
1006 addresses: FourTuple,
1007 buf: &mut Vec<u8>,
1008 ) -> Option<DatagramEvent> {
1009 let dst_cid = event.first_decode.dst_cid();
1010 let Some(header) = event.first_decode.initial_header() else {
1011 debug!(
1012 "unable to extract initial header for connection {}",
1013 dst_cid
1014 );
1015 return None;
1016 };
1017
1018 let crypto = {
1019 let Some(server_config) = &self.server_config else {
1020 debug!("packet for unrecognized connection {}", dst_cid);
1021 return self
1022 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
1023 .map(DatagramEvent::Response);
1024 };
1025 if datagram_len < MIN_INITIAL_SIZE as usize {
1026 debug!("ignoring short initial for connection {}", dst_cid);
1027 return None;
1028 }
1029 match server_config.crypto.initial_keys(header.version, dst_cid) {
1030 Ok(keys) => keys,
1031 Err(UnsupportedVersion) => {
1032 debug!(
1033 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
1034 header.version
1035 );
1036 return None;
1037 }
1038 }
1039 };
1040
1041 if let Err(reason) = self.early_validate_first_packet(header) {
1042 return Some(DatagramEvent::Response(self.initial_close(
1043 header.version,
1044 addresses,
1045 &crypto,
1046 &header.src_cid,
1047 reason,
1048 buf,
1049 )));
1050 }
1051
1052 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
1053 Ok(packet) => packet,
1054 Err(e) => {
1055 trace!("unable to decode initial packet: {}", e);
1056 return None;
1057 }
1058 };
1059
1060 if !packet.reserved_bits_valid() {
1061 debug!("dropping connection attempt with invalid reserved bits");
1062 return None;
1063 }
1064
1065 let Header::Initial(header) = packet.header else {
1066 debug!("unexpected non-initial packet in handle_first_packet()");
1067 return None;
1068 };
1069
1070 let token = match self.server_config.as_ref() {
1071 Some(sc) => match IncomingToken::from_header(&header, sc, addresses.remote) {
1072 Ok(token) => token,
1073 Err(InvalidRetryTokenError) => {
1074 debug!("rejecting invalid retry token");
1075 return Some(DatagramEvent::Response(self.initial_close(
1076 header.version,
1077 addresses,
1078 &crypto,
1079 &header.src_cid,
1080 TransportError::INVALID_TOKEN(""),
1081 buf,
1082 )));
1083 }
1084 },
1085 None => {
1086 debug!("rejecting invalid retry token");
1087 return Some(DatagramEvent::Response(self.initial_close(
1088 header.version,
1089 addresses,
1090 &crypto,
1091 &header.src_cid,
1092 TransportError::INVALID_TOKEN(""),
1093 buf,
1094 )));
1095 }
1096 };
1097
1098 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1099 self.index
1100 .insert_initial_incoming(header.dst_cid, incoming_idx);
1101
1102 Some(DatagramEvent::NewConnection(Incoming {
1103 received_at: event.now,
1104 addresses,
1105 ecn: event.ecn,
1106 packet: InitialPacket {
1107 header,
1108 header_data: packet.header_data,
1109 payload: packet.payload,
1110 },
1111 rest: event.remaining,
1112 crypto,
1113 token,
1114 incoming_idx,
1115 improper_drop_warner: IncomingImproperDropWarner,
1116 }))
1117 }
1118
1119 #[allow(clippy::result_large_err)]
1122 pub fn accept(
1123 &mut self,
1124 mut incoming: Incoming,
1125 now: Instant,
1126 buf: &mut Vec<u8>,
1127 server_config: Option<Arc<ServerConfig>>,
1128 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1129 let remote_address_validated = incoming.remote_address_validated();
1130 incoming.improper_drop_warner.dismiss();
1131 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1132 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1133
1134 let packet_number = incoming.packet.header.number.expand(0);
1135 let InitialHeader {
1136 src_cid,
1137 dst_cid,
1138 version,
1139 ..
1140 } = incoming.packet.header;
1141 let server_config = match server_config.or_else(|| self.server_config.clone()) {
1142 Some(sc) => sc,
1143 None => {
1144 return Err(AcceptError {
1145 cause: ConnectionError::TransportError(
1146 crate::transport_error::Error::INTERNAL_ERROR(""),
1147 ),
1148 response: None,
1149 });
1150 }
1151 };
1152
1153 if server_config
1154 .transport
1155 .max_idle_timeout
1156 .is_some_and(|timeout| {
1157 incoming.received_at + Duration::from_millis(timeout.into()) <= now
1158 })
1159 {
1160 debug!("abandoning accept of stale initial");
1161 self.index.remove_initial(dst_cid);
1162 return Err(AcceptError {
1163 cause: ConnectionError::TimedOut,
1164 response: None,
1165 });
1166 }
1167
1168 if self.cids_exhausted() {
1169 debug!("refusing connection");
1170 self.index.remove_initial(dst_cid);
1171 return Err(AcceptError {
1172 cause: ConnectionError::CidsExhausted,
1173 response: Some(self.initial_close(
1174 version,
1175 incoming.addresses,
1176 &incoming.crypto,
1177 &src_cid,
1178 TransportError::CONNECTION_REFUSED(""),
1179 buf,
1180 )),
1181 });
1182 }
1183
1184 if incoming
1185 .crypto
1186 .packet
1187 .remote
1188 .decrypt(
1189 packet_number,
1190 &incoming.packet.header_data,
1191 &mut incoming.packet.payload,
1192 )
1193 .is_err()
1194 {
1195 debug!(packet_number, "failed to authenticate initial packet");
1196 self.index.remove_initial(dst_cid);
1197 return Err(AcceptError {
1198 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1199 response: None,
1200 });
1201 };
1202
1203 let ch = ConnectionHandle(self.connections.vacant_key());
1204 let loc_cid = self.new_cid(ch);
1205 let mut params = TransportParameters::new(
1206 &server_config.transport,
1207 &self.config,
1208 self.local_cid_generator.as_ref(),
1209 loc_cid,
1210 Some(&server_config),
1211 &mut self.rng,
1212 )?;
1213 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1214 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1215 params.retry_src_cid = incoming.token.retry_src_cid;
1216 let mut pref_addr_cid = None;
1217 if server_config.has_preferred_address() {
1218 let cid = self.new_cid(ch);
1219 pref_addr_cid = Some(cid);
1220 params.preferred_address = Some(PreferredAddress {
1221 address_v4: server_config.preferred_address_v4,
1222 address_v6: server_config.preferred_address_v6,
1223 connection_id: cid,
1224 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1225 });
1226 }
1227
1228 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1229 let transport_config = server_config.transport.clone();
1230 let mut conn = self.add_connection(
1231 ch,
1232 version,
1233 dst_cid,
1234 loc_cid,
1235 src_cid,
1236 incoming.addresses,
1237 incoming.received_at,
1238 tls,
1239 transport_config,
1240 SideArgs::Server {
1241 server_config,
1242 pref_addr_cid,
1243 path_validated: remote_address_validated,
1244 },
1245 );
1246 self.index.insert_initial(dst_cid, ch);
1247
1248 match conn.handle_first_packet(
1249 incoming.received_at,
1250 incoming.addresses.remote,
1251 incoming.ecn,
1252 packet_number,
1253 incoming.packet,
1254 incoming.rest,
1255 ) {
1256 Ok(()) => {
1257 trace!(id = ch.0, icid = %dst_cid, "new connection");
1258
1259 for event in incoming_buffer.datagrams {
1260 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1261 }
1262
1263 Ok((ch, conn))
1264 }
1265 Err(e) => {
1266 debug!("handshake failed: {}", e);
1267 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1268 let response = match e {
1269 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1270 version,
1271 incoming.addresses,
1272 &incoming.crypto,
1273 &src_cid,
1274 e.clone(),
1275 buf,
1276 )),
1277 _ => None,
1278 };
1279 Err(AcceptError { cause: e, response })
1280 }
1281 }
1282 }
1283
1284 fn early_validate_first_packet(
1286 &mut self,
1287 header: &ProtectedInitialHeader,
1288 ) -> Result<(), TransportError> {
1289 let Some(config) = &self.server_config else {
1290 return Err(TransportError::INTERNAL_ERROR(""));
1291 };
1292 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1293 return Err(TransportError::CONNECTION_REFUSED(""));
1294 }
1295
1296 if header.dst_cid.len() < 8
1301 && (header.token_pos.is_empty()
1302 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1303 {
1304 debug!(
1305 "rejecting connection due to invalid DCID length {}",
1306 header.dst_cid.len()
1307 );
1308 return Err(TransportError::PROTOCOL_VIOLATION(
1309 "invalid destination CID length",
1310 ));
1311 }
1312
1313 Ok(())
1314 }
1315
1316 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1318 self.clean_up_incoming(&incoming);
1319 incoming.improper_drop_warner.dismiss();
1320
1321 self.initial_close(
1322 incoming.packet.header.version,
1323 incoming.addresses,
1324 &incoming.crypto,
1325 &incoming.packet.header.src_cid,
1326 TransportError::CONNECTION_REFUSED(""),
1327 buf,
1328 )
1329 }
1330
1331 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1335 if !incoming.may_retry() {
1336 return Err(RetryError(Box::new(incoming)));
1337 }
1338
1339 let Some(server_config_arc) = self.server_config.clone() else {
1340 return Err(RetryError(Box::new(incoming)));
1341 };
1342
1343 self.clean_up_incoming(&incoming);
1344 incoming.improper_drop_warner.dismiss();
1345
1346 let loc_cid = self.local_cid_generator.generate_cid();
1353
1354 let payload = TokenPayload::Retry {
1355 address: incoming.addresses.remote,
1356 orig_dst_cid: incoming.packet.header.dst_cid,
1357 issued: server_config_arc.time_source.now(),
1358 };
1359 let token = Token::new(payload, &mut self.rng).encode(&*server_config_arc.token_key);
1360
1361 let header = Header::Retry {
1362 src_cid: loc_cid,
1363 dst_cid: incoming.packet.header.src_cid,
1364 version: incoming.packet.header.version,
1365 };
1366
1367 let encode = header.encode(buf);
1368 buf.put_slice(&token);
1369 buf.extend_from_slice(&server_config_arc.crypto.retry_tag(
1370 incoming.packet.header.version,
1371 &incoming.packet.header.dst_cid,
1372 buf,
1373 ));
1374 encode.finish(buf, &*incoming.crypto.header.local, None);
1375
1376 Ok(Transmit {
1377 destination: incoming.addresses.remote,
1378 ecn: None,
1379 size: buf.len(),
1380 segment_size: None,
1381 src_ip: incoming.addresses.local_ip,
1382 })
1383 }
1384
1385 pub fn ignore(&mut self, incoming: Incoming) {
1390 self.clean_up_incoming(&incoming);
1391 incoming.improper_drop_warner.dismiss();
1392 }
1393
1394 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1396 self.index.remove_initial(incoming.packet.header.dst_cid);
1397 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1398 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1399 }
1400
1401 fn add_connection(
1402 &mut self,
1403 ch: ConnectionHandle,
1404 version: u32,
1405 init_cid: ConnectionId,
1406 loc_cid: ConnectionId,
1407 rem_cid: ConnectionId,
1408 addresses: FourTuple,
1409 now: Instant,
1410 tls: Box<dyn crypto::Session>,
1411 transport_config: Arc<TransportConfig>,
1412 side_args: SideArgs,
1413 ) -> Connection {
1414 let mut rng_seed = [0; 32];
1415 self.rng.fill_bytes(&mut rng_seed);
1416 let side = side_args.side();
1417 let pref_addr_cid = side_args.pref_addr_cid();
1418 let conn = Connection::new(
1419 self.config.clone(),
1420 transport_config,
1421 init_cid,
1422 loc_cid,
1423 rem_cid,
1424 addresses.remote,
1425 addresses.local_ip,
1426 tls,
1427 self.local_cid_generator.as_ref(),
1428 now,
1429 version,
1430 self.allow_mtud,
1431 rng_seed,
1432 side_args,
1433 );
1434
1435 let mut cids_issued = 0;
1436 let mut loc_cids = FxHashMap::default();
1437
1438 loc_cids.insert(cids_issued, loc_cid);
1439 cids_issued += 1;
1440
1441 if let Some(cid) = pref_addr_cid {
1442 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1443 loc_cids.insert(cids_issued, cid);
1444 cids_issued += 1;
1445 }
1446
1447 let id = self.connections.insert(ConnectionMeta {
1448 init_cid,
1449 cids_issued,
1450 loc_cids,
1451 addresses,
1452 side,
1453 reset_token: None,
1454 peer_id: None,
1455 });
1456 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1457
1458 self.index.insert_conn(addresses, loc_cid, ch, side);
1459
1460 conn
1461 }
1462
1463 fn initial_close(
1464 &mut self,
1465 version: u32,
1466 addresses: FourTuple,
1467 crypto: &Keys,
1468 remote_id: &ConnectionId,
1469 reason: TransportError,
1470 buf: &mut Vec<u8>,
1471 ) -> Transmit {
1472 let local_id = self.local_cid_generator.generate_cid();
1476 let number = PacketNumber::U8(0);
1477 let header = Header::Initial(InitialHeader {
1478 dst_cid: *remote_id,
1479 src_cid: local_id,
1480 number,
1481 token: Bytes::new(),
1482 version,
1483 });
1484
1485 let partial_encode = header.encode(buf);
1486 let max_len =
1487 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1488 frame::Close::from(reason).encode(buf, max_len);
1489 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1490 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1491 Transmit {
1492 destination: addresses.remote,
1493 ecn: None,
1494 size: buf.len(),
1495 segment_size: None,
1496 src_ip: addresses.local_ip,
1497 }
1498 }
1499
1500 pub fn config(&self) -> &EndpointConfig {
1502 &self.config
1503 }
1504
1505 pub fn enable_address_discovery(&mut self, enabled: bool) {
1512 self.address_discovery_enabled = enabled;
1513 }
1516
1517 pub fn address_discovery_enabled(&self) -> bool {
1519 self.address_discovery_enabled
1520 }
1521
1522 pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1530 Vec::new()
1532 }
1533
1534 pub fn set_address_change_callback<F>(&mut self, callback: F)
1539 where
1540 F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1541 {
1542 self.address_change_callback = Some(Box::new(callback));
1543 }
1544
1545 pub fn clear_address_change_callback(&mut self) {
1547 self.address_change_callback = None;
1548 }
1549
1550 pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1555 AddressDiscoveryStats::default()
1557 }
1558
1559 pub fn open_connections(&self) -> usize {
1561 self.connections.len()
1562 }
1563
1564 pub fn incoming_buffer_bytes(&self) -> u64 {
1567 self.all_incoming_buffers_total_bytes
1568 }
1569
1570 #[cfg(test)]
1571 #[allow(dead_code)]
1572 pub(crate) fn known_connections(&self) -> usize {
1573 let x = self.connections.len();
1574 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1575 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1577 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1579 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1580 x
1581 }
1582
1583 #[cfg(test)]
1584 #[allow(dead_code)]
1585 pub(crate) fn known_cids(&self) -> usize {
1586 self.index.connection_ids.len()
1587 }
1588
1589 fn cids_exhausted(&self) -> bool {
1594 self.local_cid_generator.cid_len() <= 4
1595 && self.local_cid_generator.cid_len() != 0
1596 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1597 - self.index.connection_ids.len())
1598 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1599 }
1600}
1601
1602impl fmt::Debug for Endpoint {
1603 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1604 fmt.debug_struct("Endpoint")
1605 .field("rng", &self.rng)
1606 .field("index", &self.index)
1607 .field("connections", &self.connections)
1608 .field("config", &self.config)
1609 .field("server_config", &self.server_config)
1610 .field("incoming_buffers.len", &self.incoming_buffers.len())
1612 .field(
1613 "all_incoming_buffers_total_bytes",
1614 &self.all_incoming_buffers_total_bytes,
1615 )
1616 .finish()
1617 }
1618}
1619
1620#[derive(Default)]
1622struct IncomingBuffer {
1623 datagrams: Vec<DatagramConnectionEvent>,
1624 total_bytes: u64,
1625}
1626
1627#[derive(Copy, Clone, Debug)]
1629enum RouteDatagramTo {
1630 Incoming(usize),
1631 Connection(ConnectionHandle),
1632}
1633
1634#[derive(Default, Debug)]
1636struct ConnectionIndex {
1637 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1643 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1647 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1651 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1660 connection_reset_tokens: ResetTokenTable,
1665}
1666
1667impl ConnectionIndex {
1668 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1670 if dst_cid.is_empty() {
1671 return;
1672 }
1673 self.connection_ids_initial
1674 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1675 }
1676
1677 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1679 if dst_cid.is_empty() {
1680 return;
1681 }
1682 let removed = self.connection_ids_initial.remove(&dst_cid);
1683 debug_assert!(removed.is_some());
1684 }
1685
1686 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1688 if dst_cid.is_empty() {
1689 return;
1690 }
1691 self.connection_ids_initial
1692 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1693 }
1694
1695 fn insert_conn(
1698 &mut self,
1699 addresses: FourTuple,
1700 dst_cid: ConnectionId,
1701 connection: ConnectionHandle,
1702 side: Side,
1703 ) {
1704 match dst_cid.len() {
1705 0 => match side {
1706 Side::Server => {
1707 self.incoming_connection_remotes
1708 .insert(addresses, connection);
1709 }
1710 Side::Client => {
1711 self.outgoing_connection_remotes
1712 .insert(addresses.remote, connection);
1713 }
1714 },
1715 _ => {
1716 self.connection_ids.insert(dst_cid, connection);
1717 }
1718 }
1719 }
1720
1721 fn retire(&mut self, dst_cid: ConnectionId) {
1723 self.connection_ids.remove(&dst_cid);
1724 }
1725
1726 fn remove(&mut self, conn: &ConnectionMeta) {
1728 if conn.side.is_server() {
1729 self.remove_initial(conn.init_cid);
1730 }
1731 for cid in conn.loc_cids.values() {
1732 self.connection_ids.remove(cid);
1733 }
1734 self.incoming_connection_remotes.remove(&conn.addresses);
1735 self.outgoing_connection_remotes
1736 .remove(&conn.addresses.remote);
1737 if let Some((remote, token)) = conn.reset_token {
1738 self.connection_reset_tokens.remove(remote, token);
1739 }
1740 }
1741
1742 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1744 let dst_cid = datagram.dst_cid();
1745 let is_empty_cid = dst_cid.is_empty();
1746
1747 if !is_empty_cid {
1749 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1750 return Some(RouteDatagramTo::Connection(ch));
1751 }
1752 }
1753
1754 if datagram.is_initial() || datagram.is_0rtt() {
1756 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1757 return Some(ch);
1758 }
1759 }
1760
1761 if is_empty_cid {
1763 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1765 return Some(RouteDatagramTo::Connection(ch));
1766 }
1767 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1768 return Some(RouteDatagramTo::Connection(ch));
1769 }
1770 }
1771
1772 let data = datagram.data();
1774 if data.len() < RESET_TOKEN_SIZE {
1775 return None;
1776 }
1777 self.connection_reset_tokens
1778 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1779 .cloned()
1780 .map(RouteDatagramTo::Connection)
1781 }
1782}
1783
1784#[derive(Debug)]
1785pub(crate) struct ConnectionMeta {
1786 init_cid: ConnectionId,
1787 cids_issued: u64,
1789 loc_cids: FxHashMap<u64, ConnectionId>,
1790 addresses: FourTuple,
1795 side: Side,
1796 reset_token: Option<(SocketAddr, ResetToken)>,
1799 peer_id: Option<PeerId>,
1801}
1802
1803#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1805pub struct ConnectionHandle(pub usize);
1806
1807impl From<ConnectionHandle> for usize {
1808 fn from(x: ConnectionHandle) -> Self {
1809 x.0
1810 }
1811}
1812
1813impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1814 type Output = ConnectionMeta;
1815 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1816 &self[ch.0]
1817 }
1818}
1819
1820impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1821 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1822 &mut self[ch.0]
1823 }
1824}
1825
1826pub enum DatagramEvent {
1828 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1830 NewConnection(Incoming),
1832 Response(Transmit),
1834}
1835
1836pub struct Incoming {
1838 received_at: Instant,
1839 addresses: FourTuple,
1840 ecn: Option<EcnCodepoint>,
1841 packet: InitialPacket,
1842 rest: Option<BytesMut>,
1843 crypto: Keys,
1844 token: IncomingToken,
1845 incoming_idx: usize,
1846 improper_drop_warner: IncomingImproperDropWarner,
1847}
1848
1849impl Incoming {
1850 pub fn local_ip(&self) -> Option<IpAddr> {
1854 self.addresses.local_ip
1855 }
1856
1857 pub fn remote_address(&self) -> SocketAddr {
1859 self.addresses.remote
1860 }
1861
1862 pub fn remote_address_validated(&self) -> bool {
1870 self.token.validated
1871 }
1872
1873 pub fn may_retry(&self) -> bool {
1878 self.token.retry_src_cid.is_none()
1879 }
1880
1881 pub fn orig_dst_cid(&self) -> &ConnectionId {
1883 &self.token.orig_dst_cid
1884 }
1885}
1886
1887impl fmt::Debug for Incoming {
1888 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1889 f.debug_struct("Incoming")
1890 .field("addresses", &self.addresses)
1891 .field("ecn", &self.ecn)
1892 .field("token", &self.token)
1895 .field("incoming_idx", &self.incoming_idx)
1896 .finish_non_exhaustive()
1898 }
1899}
1900
1901struct IncomingImproperDropWarner;
1902
1903impl IncomingImproperDropWarner {
1904 fn dismiss(self) {
1905 mem::forget(self);
1906 }
1907}
1908
1909impl Drop for IncomingImproperDropWarner {
1910 fn drop(&mut self) {
1911 warn!(
1912 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1913 (may cause memory leak and eventual inability to accept new connections)"
1914 );
1915 }
1916}
1917
1918#[derive(Debug, Error, Clone, PartialEq, Eq)]
1922pub enum ConnectError {
1923 #[error("endpoint stopping")]
1927 EndpointStopping,
1928 #[error("CIDs exhausted")]
1932 CidsExhausted,
1933 #[error("invalid server name: {0}")]
1935 InvalidServerName(String),
1936 #[error("invalid remote address: {0}")]
1940 InvalidRemoteAddress(SocketAddr),
1941 #[error("no default client config")]
1945 NoDefaultClientConfig,
1946 #[error("unsupported QUIC version")]
1948 UnsupportedVersion,
1949 #[error("TLS error: {0}")]
1951 TlsError(String),
1952}
1953
1954#[derive(Debug)]
1956pub struct AcceptError {
1957 pub cause: ConnectionError,
1959 pub response: Option<Transmit>,
1961}
1962
1963impl From<rustls::Error> for ConnectError {
1964 fn from(error: rustls::Error) -> Self {
1965 ConnectError::TlsError(error.to_string())
1966 }
1967}
1968
1969impl From<crate::transport_error::Error> for AcceptError {
1970 fn from(error: crate::transport_error::Error) -> Self {
1971 Self {
1972 cause: ConnectionError::TransportError(error),
1973 response: None,
1974 }
1975 }
1976}
1977
1978#[derive(Debug, Error)]
1980#[error("retry() with validated Incoming")]
1981pub struct RetryError(Box<Incoming>);
1982
1983impl RetryError {
1984 pub fn into_incoming(self) -> Incoming {
1986 *self.0
1987 }
1988}
1989
1990#[derive(Default, Debug)]
1995struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1996
1997impl ResetTokenTable {
1998 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1999 self.0
2000 .entry(remote)
2001 .or_default()
2002 .insert(token, ch)
2003 .is_some()
2004 }
2005
2006 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
2007 use std::collections::hash_map::Entry;
2008 match self.0.entry(remote) {
2009 Entry::Vacant(_) => {}
2010 Entry::Occupied(mut e) => {
2011 e.get_mut().remove(&token);
2012 if e.get().is_empty() {
2013 e.remove_entry();
2014 }
2015 }
2016 }
2017 }
2018
2019 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
2020 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
2021 self.0.get(&remote)?.get(&token)
2022 }
2023}
2024
2025#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
2030struct FourTuple {
2031 remote: SocketAddr,
2032 local_ip: Option<IpAddr>,
2034}