1use std::{
2 collections::{HashMap, VecDeque, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21 Side, Transmit, TransportConfig, TransportError,
22 cid_generator::ConnectionIdGenerator,
23 coding::BufMutExt,
24 config::{ClientConfig, EndpointConfig, ServerConfig},
25 connection::{Connection, ConnectionError, SideArgs},
26 crypto::{self, Keys, UnsupportedVersion},
27 frame,
28 nat_traversal_api::PeerId,
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31 PacketNumber, PartialDecode, ProtectedInitialHeader,
32 },
33 shared::{
34 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35 EndpointEvent, EndpointEventInner, IssuedCid,
36 },
37 relay::RelayStatisticsCollector,
38 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
39 transport_parameters::{PreferredAddress, TransportParameters},
40};
41
42#[derive(Debug, Clone)]
44struct RelayQueueItem {
45 target_peer_id: PeerId,
47 frame: frame::PunchMeNow,
49 created_at: Instant,
51 attempts: u32,
53 last_attempt: Option<Instant>,
55}
56
57#[derive(Debug)]
59struct RelayQueue {
60 pending: IndexMap<u64, RelayQueueItem>,
62 next_seq: u64,
64 max_queue_size: usize,
66 request_timeout: Duration,
68 max_retries: u32,
70 retry_interval: Duration,
72 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
74 max_relays_per_peer: usize,
76 rate_limit_window: Duration,
78}
79
80#[derive(Debug, Default, Clone)]
82pub struct AddressDiscoveryStats {
83 pub frames_sent: u64,
85 pub frames_received: u64,
87 pub addresses_discovered: u64,
89 pub address_changes_detected: u64,
91}
92
93#[derive(Debug, Default, Clone)]
95pub struct RelayStats {
96 pub requests_received: u64,
98 pub requests_relayed: u64,
100 pub requests_failed: u64,
102 pub requests_dropped: u64,
104 pub requests_timed_out: u64,
106 pub requests_rate_limited: u64,
108 pub current_queue_size: usize,
110}
111
112impl RelayQueue {
113 fn new() -> Self {
115 Self {
116 pending: IndexMap::new(),
117 next_seq: 0,
118 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
121 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
123 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
126 }
127
128 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
130 if self.pending.len() >= self.max_queue_size {
132 warn!(
133 "Relay queue full, dropping request for peer {:?}",
134 target_peer_id
135 );
136 return false;
137 }
138
139 if !self.check_rate_limit(target_peer_id, now) {
141 warn!(
142 "Rate limit exceeded for peer {:?}, dropping relay request",
143 target_peer_id
144 );
145 return false;
146 }
147
148 let item = RelayQueueItem {
149 target_peer_id,
150 frame,
151 created_at: now,
152 attempts: 0,
153 last_attempt: None,
154 };
155
156 let seq = self.next_seq;
157 self.next_seq += 1;
158 self.pending.insert(seq, item);
159
160 self.record_relay_request(target_peer_id, now);
162
163 trace!(
164 "Queued relay request for peer {:?}, queue size: {}",
165 target_peer_id,
166 self.pending.len()
167 );
168 true
169 }
170
171 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
173 self.cleanup_rate_limiter(now);
175
176 if let Some(requests) = self.rate_limiter.get(&peer_id) {
178 requests.len() < self.max_relays_per_peer
179 } else {
180 true }
182 }
183
184 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
186 self.rate_limiter.entry(peer_id).or_default().push_back(now);
187 }
188
189 fn cleanup_rate_limiter(&mut self, now: Instant) {
191 self.rate_limiter.retain(|_, requests| {
192 requests.retain(|&request_time| {
193 now.saturating_duration_since(request_time) <= self.rate_limit_window
194 });
195 !requests.is_empty()
196 });
197 }
198
199 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
201 let mut expired_keys = Vec::new();
203 let mut ready_key = None;
204
205 for (seq, item) in &self.pending {
206 if now.saturating_duration_since(item.created_at) > self.request_timeout {
208 expired_keys.push(*seq);
209 continue;
210 }
211
212 if item.attempts == 0
214 || item
215 .last_attempt
216 .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
217 {
218 ready_key = Some(*seq);
219 break;
220 }
221 }
222
223 for key in expired_keys {
225 if let Some(expired) = self.pending.shift_remove(&key) {
226 debug!(
227 "Relay request for peer {:?} timed out after {:?}",
228 expired.target_peer_id,
229 now.saturating_duration_since(expired.created_at)
230 );
231 }
232 }
233
234 if let Some(key) = ready_key {
236 if let Some(mut item) = self.pending.shift_remove(&key) {
237 item.attempts += 1;
238 item.last_attempt = Some(now);
239 return Some(item);
240 }
241 }
242
243 None
244 }
245
246 fn requeue_failed(&mut self, item: RelayQueueItem) {
248 if item.attempts < self.max_retries {
249 trace!(
250 "Requeuing failed relay request for peer {:?}, attempt {}/{}",
251 item.target_peer_id, item.attempts, self.max_retries
252 );
253 let seq = self.next_seq;
254 self.next_seq += 1;
255 self.pending.insert(seq, item);
256 } else {
257 debug!(
258 "Dropping relay request for peer {:?} after {} failed attempts",
259 item.target_peer_id, item.attempts
260 );
261 }
262 }
263
264 fn cleanup_expired(&mut self, now: Instant) -> usize {
266 let initial_len = self.pending.len();
267
268 let expired_keys: Vec<u64> = self
270 .pending
271 .iter()
272 .filter_map(|(seq, item)| {
273 if now.saturating_duration_since(item.created_at) > self.request_timeout {
274 Some(*seq)
275 } else {
276 None
277 }
278 })
279 .collect();
280
281 for key in expired_keys {
283 if let Some(expired) = self.pending.shift_remove(&key) {
284 debug!(
285 "Removing expired relay request for peer {:?}",
286 expired.target_peer_id
287 );
288 }
289 }
290
291 initial_len - self.pending.len()
292 }
293
294 fn len(&self) -> usize {
296 self.pending.len()
297 }
298}
299
300pub struct Endpoint {
305 rng: StdRng,
306 index: ConnectionIndex,
307 connections: Slab<ConnectionMeta>,
308 local_cid_generator: Box<dyn ConnectionIdGenerator>,
309 config: Arc<EndpointConfig>,
310 server_config: Option<Arc<ServerConfig>>,
311 allow_mtud: bool,
313 last_stateless_reset: Option<Instant>,
315 incoming_buffers: Slab<IncomingBuffer>,
317 all_incoming_buffers_total_bytes: u64,
318 peer_connections: HashMap<PeerId, ConnectionHandle>,
320 relay_queue: RelayQueue,
322 relay_stats: RelayStats,
324 relay_stats_collector: RelayStatisticsCollector,
326 address_discovery_enabled: bool,
328 address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
330}
331
332impl Endpoint {
333 pub fn new(
344 config: Arc<EndpointConfig>,
345 server_config: Option<Arc<ServerConfig>>,
346 allow_mtud: bool,
347 rng_seed: Option<[u8; 32]>,
348 ) -> Self {
349 let rng_seed = rng_seed.or(config.rng_seed);
350 Self {
351 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
352 index: ConnectionIndex::default(),
353 connections: Slab::new(),
354 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
355 config,
356 server_config,
357 allow_mtud,
358 last_stateless_reset: None,
359 incoming_buffers: Slab::new(),
360 all_incoming_buffers_total_bytes: 0,
361 peer_connections: HashMap::new(),
362 relay_queue: RelayQueue::new(),
363 relay_stats: RelayStats::default(),
364 relay_stats_collector: RelayStatisticsCollector::new(),
365 address_discovery_enabled: true, address_change_callback: None,
367 }
368 }
369
370 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
372 self.server_config = server_config;
373 }
374
375 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
377 self.peer_connections.insert(peer_id, connection_handle);
378 trace!(
379 "Registered peer {:?} with connection {:?}",
380 peer_id, connection_handle
381 );
382 }
383
384 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
386 if let Some(handle) = self.peer_connections.remove(peer_id) {
387 trace!(
388 "Unregistered peer {:?} from connection {:?}",
389 peer_id, handle
390 );
391 }
392 }
393
394 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
396 self.peer_connections.get(peer_id).copied()
397 }
398
399 pub(crate) fn queue_frame_for_peer(
401 &mut self,
402 peer_id: &PeerId,
403 frame: frame::PunchMeNow,
404 ) -> bool {
405 self.relay_stats.requests_received += 1;
406
407 if let Some(ch) = self.lookup_peer_connection(peer_id) {
408 if self.relay_frame_to_connection(ch, frame.clone()) {
410 self.relay_stats.requests_relayed += 1;
411 self.relay_stats_collector.record_rate_limit(true);
413 trace!(
414 "Immediately relayed frame to peer {:?} via connection {:?}",
415 peer_id, ch
416 );
417 return true;
418 }
419 }
420
421 let now = Instant::now();
423 if self.relay_queue.enqueue(*peer_id, frame, now) {
424 self.relay_stats.current_queue_size = self.relay_queue.len();
425 self.relay_stats_collector.record_rate_limit(true);
427 trace!("Queued relay request for peer {:?}", peer_id);
428 true
429 } else {
430 if !self.relay_queue.check_rate_limit(*peer_id, now) {
432 self.relay_stats.requests_rate_limited += 1;
433 self.relay_stats_collector.record_rate_limit(false);
435 self.relay_stats_collector.record_error("rate_limited");
437 } else {
438 self.relay_stats.requests_dropped += 1;
439 self.relay_stats_collector.record_error("resource_exhausted");
441 }
442 false
443 }
444 }
445
446 fn relay_frame_to_connection(
448 &mut self,
449 ch: ConnectionHandle,
450 frame: frame::PunchMeNow,
451 ) -> bool {
452 let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
454 if let Some(_conn) = self.connections.get_mut(ch.0) {
455 }
462 true
466 }
467
468 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
470 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
471 connection.peer_id = Some(peer_id);
472 self.register_peer(peer_id, connection_handle);
473
474 self.process_queued_relays_for_peer(peer_id);
476 }
477 }
478
479 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
481 let _now = Instant::now();
482 let mut processed = 0;
483
484 let mut items_to_process = Vec::new();
486 let mut keys_to_remove = Vec::new();
487
488 for (seq, item) in &self.relay_queue.pending {
490 if item.target_peer_id == peer_id {
491 items_to_process.push(item.clone());
492 keys_to_remove.push(*seq);
493 }
494 }
495
496 for key in keys_to_remove {
498 self.relay_queue.pending.shift_remove(&key);
499 }
500
501 for item in items_to_process {
503 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
504 if self.relay_frame_to_connection(ch, item.frame.clone()) {
505 self.relay_stats.requests_relayed += 1;
506 processed += 1;
507 trace!("Processed queued relay for peer {:?}", peer_id);
508 } else {
509 self.relay_queue.requeue_failed(item);
511 self.relay_stats.requests_failed += 1;
512 }
513 }
514 }
515
516 self.relay_stats.current_queue_size = self.relay_queue.len();
517
518 if processed > 0 {
519 debug!(
520 "Processed {} queued relay requests for peer {:?}",
521 processed, peer_id
522 );
523 }
524 }
525
526 pub fn process_relay_queue(&mut self) {
528 let now = Instant::now();
529 let mut processed = 0;
530 let mut failed = 0;
531
532 while let Some(item) = self.relay_queue.next_ready(now) {
534 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
535 if self.relay_frame_to_connection(ch, item.frame.clone()) {
536 self.relay_stats.requests_relayed += 1;
537 processed += 1;
538 trace!(
539 "Successfully relayed frame to peer {:?}",
540 item.target_peer_id
541 );
542 } else {
543 self.relay_queue.requeue_failed(item);
545 self.relay_stats.requests_failed += 1;
546 self.relay_stats_collector.record_error("connection_failure");
548 failed += 1;
549 }
550 } else {
551 self.relay_queue.requeue_failed(item);
553 self.relay_stats_collector.record_error("peer_not_found");
555 failed += 1;
556 }
557 }
558
559 let expired = self.relay_queue.cleanup_expired(now);
561 if expired > 0 {
562 self.relay_stats.requests_timed_out += expired as u64;
563 for _ in 0..expired {
565 self.relay_stats_collector.record_error("request_timeout");
566 }
567 debug!("Cleaned up {} expired relay requests", expired);
568 }
569
570 self.relay_stats.current_queue_size = self.relay_queue.len();
571
572 if processed > 0 || failed > 0 {
573 trace!(
574 "Relay queue processing: {} processed, {} failed, {} in queue",
575 processed,
576 failed,
577 self.relay_queue.len()
578 );
579 }
580 }
581
582 pub fn relay_stats(&self) -> &RelayStats {
584 &self.relay_stats
585 }
586
587 pub fn comprehensive_relay_stats(&self) -> crate::relay::RelayStatistics {
589 self.relay_stats_collector.update_queue_stats(&self.relay_stats);
591 self.relay_stats_collector.collect_statistics()
592 }
593
594 pub fn relay_stats_collector(&self) -> &RelayStatisticsCollector {
596 &self.relay_stats_collector
597 }
598
599 pub fn relay_queue_len(&self) -> usize {
601 self.relay_queue.len()
602 }
603
604 pub fn handle_event(
608 &mut self,
609 ch: ConnectionHandle,
610 event: EndpointEvent,
611 ) -> Option<ConnectionEvent> {
612 use EndpointEventInner::*;
613 match event.0 {
614 EndpointEventInner::NeedIdentifiers(now, n) => {
615 return Some(self.send_new_identifiers(now, ch, n));
616 }
617 ResetToken(remote, token) => {
618 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
619 self.index.connection_reset_tokens.remove(old.0, old.1);
620 }
621 if self.index.connection_reset_tokens.insert(remote, token, ch) {
622 warn!("duplicate reset token");
623 }
624 }
625 RetireConnectionId(now, seq, allow_more_cids) => {
626 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
627 trace!("peer retired CID {}: {}", seq, cid);
628 self.index.retire(cid);
629 if allow_more_cids {
630 return Some(self.send_new_identifiers(now, ch, 1));
631 }
632 }
633 }
634 RelayPunchMeNow(target_peer_id, punch_me_now) => {
635 let peer_id = PeerId(target_peer_id);
637 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
638 trace!(
639 "Successfully queued PunchMeNow frame for relay to peer {:?}",
640 peer_id
641 );
642 } else {
643 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
644 }
645 }
646 SendAddressFrame(add_address_frame) => {
647 return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
649 add_address_frame,
650 )));
651 }
652 NatCandidateValidated { address, challenge } => {
653 trace!(
655 "NAT candidate validation succeeded for {} with challenge {:016x}",
656 address, challenge
657 );
658
659 debug!("NAT candidate {} validated successfully", address);
663 }
664 Drained => {
665 if let Some(conn) = self.connections.try_remove(ch.0) {
666 self.index.remove(&conn);
667 if let Some(peer_id) = conn.peer_id {
669 self.peer_connections.remove(&peer_id);
670 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
671 }
672 } else {
673 error!(id = ch.0, "unknown connection drained");
677 }
678 }
679 }
680 None
681 }
682
683 pub fn handle(
685 &mut self,
686 now: Instant,
687 remote: SocketAddr,
688 local_ip: Option<IpAddr>,
689 ecn: Option<EcnCodepoint>,
690 data: BytesMut,
691 buf: &mut Vec<u8>,
692 ) -> Option<DatagramEvent> {
693 let datagram_len = data.len();
695 let event = match PartialDecode::new(
696 data,
697 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
698 &self.config.supported_versions,
699 self.config.grease_quic_bit,
700 ) {
701 Ok((first_decode, remaining)) => DatagramConnectionEvent {
702 now,
703 remote,
704 ecn,
705 first_decode,
706 remaining,
707 },
708 Err(PacketDecodeError::UnsupportedVersion {
709 src_cid,
710 dst_cid,
711 version,
712 }) => {
713 if self.server_config.is_none() {
714 debug!("dropping packet with unsupported version");
715 return None;
716 }
717 trace!("sending version negotiation");
718 Header::VersionNegotiate {
720 random: self.rng.r#gen::<u8>() | 0x40,
721 src_cid: dst_cid,
722 dst_cid: src_cid,
723 }
724 .encode(buf);
725 buf.write::<u32>(match version {
727 0x0a1a_2a3a => 0x0a1a_2a4a,
728 _ => 0x0a1a_2a3a,
729 });
730 for &version in &self.config.supported_versions {
731 buf.write(version);
732 }
733 return Some(DatagramEvent::Response(Transmit {
734 destination: remote,
735 ecn: None,
736 size: buf.len(),
737 segment_size: None,
738 src_ip: local_ip,
739 }));
740 }
741 Err(e) => {
742 trace!("malformed header: {}", e);
743 return None;
744 }
745 };
746
747 let addresses = FourTuple { remote, local_ip };
748 let dst_cid = event.first_decode.dst_cid();
749
750 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
751 match route_to {
753 RouteDatagramTo::Incoming(incoming_idx) => {
754 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
755 let config = &self.server_config.as_ref().unwrap();
756
757 if incoming_buffer
758 .total_bytes
759 .checked_add(datagram_len as u64)
760 .is_some_and(|n| n <= config.incoming_buffer_size)
761 && self
762 .all_incoming_buffers_total_bytes
763 .checked_add(datagram_len as u64)
764 .is_some_and(|n| n <= config.incoming_buffer_size_total)
765 {
766 incoming_buffer.datagrams.push(event);
767 incoming_buffer.total_bytes += datagram_len as u64;
768 self.all_incoming_buffers_total_bytes += datagram_len as u64;
769 }
770
771 None
772 }
773 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
774 ch,
775 ConnectionEvent(ConnectionEventInner::Datagram(event)),
776 )),
777 }
778 } else if event.first_decode.initial_header().is_some() {
779 self.handle_first_packet(datagram_len, event, addresses, buf)
782 } else if event.first_decode.has_long_header() {
783 debug!(
784 "ignoring non-initial packet for unknown connection {}",
785 dst_cid
786 );
787 None
788 } else if !event.first_decode.is_initial()
789 && self.local_cid_generator.validate(dst_cid).is_err()
790 {
791 debug!("dropping packet with invalid CID");
795 None
796 } else if dst_cid.is_empty() {
797 trace!("dropping unrecognized short packet without ID");
798 None
799 } else {
800 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
801 .map(DatagramEvent::Response)
802 }
803 }
804
805 fn stateless_reset(
806 &mut self,
807 now: Instant,
808 inciting_dgram_len: usize,
809 addresses: FourTuple,
810 dst_cid: ConnectionId,
811 buf: &mut Vec<u8>,
812 ) -> Option<Transmit> {
813 if self
814 .last_stateless_reset
815 .is_some_and(|last| last + self.config.min_reset_interval > now)
816 {
817 debug!("ignoring unexpected packet within minimum stateless reset interval");
818 return None;
819 }
820
821 const MIN_PADDING_LEN: usize = 5;
823
824 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
827 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
828 _ => {
829 debug!(
830 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
831 inciting_dgram_len
832 );
833 return None;
834 }
835 };
836
837 debug!(
838 "sending stateless reset for {} to {}",
839 dst_cid, addresses.remote
840 );
841 self.last_stateless_reset = Some(now);
842 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
844 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
845 max_padding_len
846 } else {
847 self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
848 };
849 buf.reserve(padding_len + RESET_TOKEN_SIZE);
850 buf.resize(padding_len, 0);
851 self.rng.fill_bytes(&mut buf[0..padding_len]);
852 buf[0] = 0b0100_0000 | (buf[0] >> 2);
853 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
854
855 debug_assert!(buf.len() < inciting_dgram_len);
856
857 Some(Transmit {
858 destination: addresses.remote,
859 ecn: None,
860 size: buf.len(),
861 segment_size: None,
862 src_ip: addresses.local_ip,
863 })
864 }
865
866 pub fn connect(
868 &mut self,
869 now: Instant,
870 config: ClientConfig,
871 remote: SocketAddr,
872 server_name: &str,
873 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
874 if self.cids_exhausted() {
875 return Err(ConnectError::CidsExhausted);
876 }
877 if remote.port() == 0 || remote.ip().is_unspecified() {
878 return Err(ConnectError::InvalidRemoteAddress(remote));
879 }
880 if !self.config.supported_versions.contains(&config.version) {
881 return Err(ConnectError::UnsupportedVersion);
882 }
883
884 let remote_id = (config.initial_dst_cid_provider)();
885 trace!(initial_dcid = %remote_id);
886
887 let ch = ConnectionHandle(self.connections.vacant_key());
888 let loc_cid = self.new_cid(ch);
889 let params = TransportParameters::new(
890 &config.transport,
891 &self.config,
892 self.local_cid_generator.as_ref(),
893 loc_cid,
894 None,
895 &mut self.rng,
896 );
897 let tls = config
898 .crypto
899 .start_session(config.version, server_name, ¶ms)?;
900
901 let conn = self.add_connection(
902 ch,
903 config.version,
904 remote_id,
905 loc_cid,
906 remote_id,
907 FourTuple {
908 remote,
909 local_ip: None,
910 },
911 now,
912 tls,
913 config.transport,
914 SideArgs::Client {
915 token_store: config.token_store,
916 server_name: server_name.into(),
917 },
918 );
919 Ok((ch, conn))
920 }
921
922 fn send_new_identifiers(
923 &mut self,
924 now: Instant,
925 ch: ConnectionHandle,
926 num: u64,
927 ) -> ConnectionEvent {
928 let mut ids = vec![];
929 for _ in 0..num {
930 let id = self.new_cid(ch);
931 let meta = &mut self.connections[ch];
932 let sequence = meta.cids_issued;
933 meta.cids_issued += 1;
934 meta.loc_cids.insert(sequence, id);
935 ids.push(IssuedCid {
936 sequence,
937 id,
938 reset_token: ResetToken::new(&*self.config.reset_key, id),
939 });
940 }
941 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
942 }
943
944 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
946 loop {
947 let cid = self.local_cid_generator.generate_cid();
948 if cid.is_empty() {
949 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
951 return cid;
952 }
953 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
954 e.insert(ch);
955 break cid;
956 }
957 }
958 }
959
960 fn handle_first_packet(
961 &mut self,
962 datagram_len: usize,
963 event: DatagramConnectionEvent,
964 addresses: FourTuple,
965 buf: &mut Vec<u8>,
966 ) -> Option<DatagramEvent> {
967 let dst_cid = event.first_decode.dst_cid();
968 let header = event.first_decode.initial_header().unwrap();
969
970 let Some(server_config) = &self.server_config else {
971 debug!("packet for unrecognized connection {}", dst_cid);
972 return self
973 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
974 .map(DatagramEvent::Response);
975 };
976
977 if datagram_len < MIN_INITIAL_SIZE as usize {
978 debug!("ignoring short initial for connection {}", dst_cid);
979 return None;
980 }
981
982 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
983 Ok(keys) => keys,
984 Err(UnsupportedVersion) => {
985 debug!(
988 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
989 header.version
990 );
991 return None;
992 }
993 };
994
995 if let Err(reason) = self.early_validate_first_packet(header) {
996 return Some(DatagramEvent::Response(self.initial_close(
997 header.version,
998 addresses,
999 &crypto,
1000 &header.src_cid,
1001 reason,
1002 buf,
1003 )));
1004 }
1005
1006 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
1007 Ok(packet) => packet,
1008 Err(e) => {
1009 trace!("unable to decode initial packet: {}", e);
1010 return None;
1011 }
1012 };
1013
1014 if !packet.reserved_bits_valid() {
1015 debug!("dropping connection attempt with invalid reserved bits");
1016 return None;
1017 }
1018
1019 let Header::Initial(header) = packet.header else {
1020 panic!("non-initial packet in handle_first_packet()");
1021 };
1022
1023 let server_config = self.server_config.as_ref().unwrap().clone();
1024
1025 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
1026 Ok(token) => token,
1027 Err(InvalidRetryTokenError) => {
1028 debug!("rejecting invalid retry token");
1029 return Some(DatagramEvent::Response(self.initial_close(
1030 header.version,
1031 addresses,
1032 &crypto,
1033 &header.src_cid,
1034 TransportError::INVALID_TOKEN(""),
1035 buf,
1036 )));
1037 }
1038 };
1039
1040 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1041 self.index
1042 .insert_initial_incoming(header.dst_cid, incoming_idx);
1043
1044 Some(DatagramEvent::NewConnection(Incoming {
1045 received_at: event.now,
1046 addresses,
1047 ecn: event.ecn,
1048 packet: InitialPacket {
1049 header,
1050 header_data: packet.header_data,
1051 payload: packet.payload,
1052 },
1053 rest: event.remaining,
1054 crypto,
1055 token,
1056 incoming_idx,
1057 improper_drop_warner: IncomingImproperDropWarner,
1058 }))
1059 }
1060
1061 #[allow(clippy::result_large_err)]
1064 pub fn accept(
1065 &mut self,
1066 mut incoming: Incoming,
1067 now: Instant,
1068 buf: &mut Vec<u8>,
1069 server_config: Option<Arc<ServerConfig>>,
1070 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1071 let remote_address_validated = incoming.remote_address_validated();
1072 incoming.improper_drop_warner.dismiss();
1073 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1074 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1075
1076 let packet_number = incoming.packet.header.number.expand(0);
1077 let InitialHeader {
1078 src_cid,
1079 dst_cid,
1080 version,
1081 ..
1082 } = incoming.packet.header;
1083 let server_config =
1084 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1085
1086 if server_config
1087 .transport
1088 .max_idle_timeout
1089 .is_some_and(|timeout| {
1090 incoming.received_at + Duration::from_millis(timeout.into()) <= now
1091 })
1092 {
1093 debug!("abandoning accept of stale initial");
1094 self.index.remove_initial(dst_cid);
1095 return Err(AcceptError {
1096 cause: ConnectionError::TimedOut,
1097 response: None,
1098 });
1099 }
1100
1101 if self.cids_exhausted() {
1102 debug!("refusing connection");
1103 self.index.remove_initial(dst_cid);
1104 return Err(AcceptError {
1105 cause: ConnectionError::CidsExhausted,
1106 response: Some(self.initial_close(
1107 version,
1108 incoming.addresses,
1109 &incoming.crypto,
1110 &src_cid,
1111 TransportError::CONNECTION_REFUSED(""),
1112 buf,
1113 )),
1114 });
1115 }
1116
1117 if incoming
1118 .crypto
1119 .packet
1120 .remote
1121 .decrypt(
1122 packet_number,
1123 &incoming.packet.header_data,
1124 &mut incoming.packet.payload,
1125 )
1126 .is_err()
1127 {
1128 debug!(packet_number, "failed to authenticate initial packet");
1129 self.index.remove_initial(dst_cid);
1130 return Err(AcceptError {
1131 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1132 response: None,
1133 });
1134 };
1135
1136 let ch = ConnectionHandle(self.connections.vacant_key());
1137 let loc_cid = self.new_cid(ch);
1138 let mut params = TransportParameters::new(
1139 &server_config.transport,
1140 &self.config,
1141 self.local_cid_generator.as_ref(),
1142 loc_cid,
1143 Some(&server_config),
1144 &mut self.rng,
1145 );
1146 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1147 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1148 params.retry_src_cid = incoming.token.retry_src_cid;
1149 let mut pref_addr_cid = None;
1150 if server_config.has_preferred_address() {
1151 let cid = self.new_cid(ch);
1152 pref_addr_cid = Some(cid);
1153 params.preferred_address = Some(PreferredAddress {
1154 address_v4: server_config.preferred_address_v4,
1155 address_v6: server_config.preferred_address_v6,
1156 connection_id: cid,
1157 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1158 });
1159 }
1160
1161 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1162 let transport_config = server_config.transport.clone();
1163 let mut conn = self.add_connection(
1164 ch,
1165 version,
1166 dst_cid,
1167 loc_cid,
1168 src_cid,
1169 incoming.addresses,
1170 incoming.received_at,
1171 tls,
1172 transport_config,
1173 SideArgs::Server {
1174 server_config,
1175 pref_addr_cid,
1176 path_validated: remote_address_validated,
1177 },
1178 );
1179 self.index.insert_initial(dst_cid, ch);
1180
1181 match conn.handle_first_packet(
1182 incoming.received_at,
1183 incoming.addresses.remote,
1184 incoming.ecn,
1185 packet_number,
1186 incoming.packet,
1187 incoming.rest,
1188 ) {
1189 Ok(()) => {
1190 trace!(id = ch.0, icid = %dst_cid, "new connection");
1191
1192 for event in incoming_buffer.datagrams {
1193 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1194 }
1195
1196 Ok((ch, conn))
1197 }
1198 Err(e) => {
1199 debug!("handshake failed: {}", e);
1200 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1201 let response = match e {
1202 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1203 version,
1204 incoming.addresses,
1205 &incoming.crypto,
1206 &src_cid,
1207 e.clone(),
1208 buf,
1209 )),
1210 _ => None,
1211 };
1212 Err(AcceptError { cause: e, response })
1213 }
1214 }
1215 }
1216
1217 fn early_validate_first_packet(
1219 &mut self,
1220 header: &ProtectedInitialHeader,
1221 ) -> Result<(), TransportError> {
1222 let config = &self.server_config.as_ref().unwrap();
1223 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1224 return Err(TransportError::CONNECTION_REFUSED(""));
1225 }
1226
1227 if header.dst_cid.len() < 8
1232 && (header.token_pos.is_empty()
1233 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1234 {
1235 debug!(
1236 "rejecting connection due to invalid DCID length {}",
1237 header.dst_cid.len()
1238 );
1239 return Err(TransportError::PROTOCOL_VIOLATION(
1240 "invalid destination CID length",
1241 ));
1242 }
1243
1244 Ok(())
1245 }
1246
1247 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1249 self.clean_up_incoming(&incoming);
1250 incoming.improper_drop_warner.dismiss();
1251
1252 self.initial_close(
1253 incoming.packet.header.version,
1254 incoming.addresses,
1255 &incoming.crypto,
1256 &incoming.packet.header.src_cid,
1257 TransportError::CONNECTION_REFUSED(""),
1258 buf,
1259 )
1260 }
1261
1262 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1266 if !incoming.may_retry() {
1267 return Err(RetryError(Box::new(incoming)));
1268 }
1269
1270 self.clean_up_incoming(&incoming);
1271 incoming.improper_drop_warner.dismiss();
1272
1273 let server_config = self.server_config.as_ref().unwrap();
1274
1275 let loc_cid = self.local_cid_generator.generate_cid();
1282
1283 let payload = TokenPayload::Retry {
1284 address: incoming.addresses.remote,
1285 orig_dst_cid: incoming.packet.header.dst_cid,
1286 issued: server_config.time_source.now(),
1287 };
1288 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1289
1290 let header = Header::Retry {
1291 src_cid: loc_cid,
1292 dst_cid: incoming.packet.header.src_cid,
1293 version: incoming.packet.header.version,
1294 };
1295
1296 let encode = header.encode(buf);
1297 buf.put_slice(&token);
1298 buf.extend_from_slice(&server_config.crypto.retry_tag(
1299 incoming.packet.header.version,
1300 &incoming.packet.header.dst_cid,
1301 buf,
1302 ));
1303 encode.finish(buf, &*incoming.crypto.header.local, None);
1304
1305 Ok(Transmit {
1306 destination: incoming.addresses.remote,
1307 ecn: None,
1308 size: buf.len(),
1309 segment_size: None,
1310 src_ip: incoming.addresses.local_ip,
1311 })
1312 }
1313
1314 pub fn ignore(&mut self, incoming: Incoming) {
1319 self.clean_up_incoming(&incoming);
1320 incoming.improper_drop_warner.dismiss();
1321 }
1322
1323 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1325 self.index.remove_initial(incoming.packet.header.dst_cid);
1326 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1327 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1328 }
1329
1330 fn add_connection(
1331 &mut self,
1332 ch: ConnectionHandle,
1333 version: u32,
1334 init_cid: ConnectionId,
1335 loc_cid: ConnectionId,
1336 rem_cid: ConnectionId,
1337 addresses: FourTuple,
1338 now: Instant,
1339 tls: Box<dyn crypto::Session>,
1340 transport_config: Arc<TransportConfig>,
1341 side_args: SideArgs,
1342 ) -> Connection {
1343 let mut rng_seed = [0; 32];
1344 self.rng.fill_bytes(&mut rng_seed);
1345 let side = side_args.side();
1346 let pref_addr_cid = side_args.pref_addr_cid();
1347 let conn = Connection::new(
1348 self.config.clone(),
1349 transport_config,
1350 init_cid,
1351 loc_cid,
1352 rem_cid,
1353 addresses.remote,
1354 addresses.local_ip,
1355 tls,
1356 self.local_cid_generator.as_ref(),
1357 now,
1358 version,
1359 self.allow_mtud,
1360 rng_seed,
1361 side_args,
1362 );
1363
1364 let mut cids_issued = 0;
1365 let mut loc_cids = FxHashMap::default();
1366
1367 loc_cids.insert(cids_issued, loc_cid);
1368 cids_issued += 1;
1369
1370 if let Some(cid) = pref_addr_cid {
1371 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1372 loc_cids.insert(cids_issued, cid);
1373 cids_issued += 1;
1374 }
1375
1376 let id = self.connections.insert(ConnectionMeta {
1377 init_cid,
1378 cids_issued,
1379 loc_cids,
1380 addresses,
1381 side,
1382 reset_token: None,
1383 peer_id: None,
1384 });
1385 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1386
1387 self.index.insert_conn(addresses, loc_cid, ch, side);
1388
1389 conn
1390 }
1391
1392 fn initial_close(
1393 &mut self,
1394 version: u32,
1395 addresses: FourTuple,
1396 crypto: &Keys,
1397 remote_id: &ConnectionId,
1398 reason: TransportError,
1399 buf: &mut Vec<u8>,
1400 ) -> Transmit {
1401 let local_id = self.local_cid_generator.generate_cid();
1405 let number = PacketNumber::U8(0);
1406 let header = Header::Initial(InitialHeader {
1407 dst_cid: *remote_id,
1408 src_cid: local_id,
1409 number,
1410 token: Bytes::new(),
1411 version,
1412 });
1413
1414 let partial_encode = header.encode(buf);
1415 let max_len =
1416 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1417 frame::Close::from(reason).encode(buf, max_len);
1418 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1419 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1420 Transmit {
1421 destination: addresses.remote,
1422 ecn: None,
1423 size: buf.len(),
1424 segment_size: None,
1425 src_ip: addresses.local_ip,
1426 }
1427 }
1428
1429 pub fn config(&self) -> &EndpointConfig {
1431 &self.config
1432 }
1433
1434 pub fn enable_address_discovery(&mut self, enabled: bool) {
1441 self.address_discovery_enabled = enabled;
1442 }
1445
1446 pub fn address_discovery_enabled(&self) -> bool {
1448 self.address_discovery_enabled
1449 }
1450
1451 pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1459 Vec::new()
1461 }
1462
1463 pub fn set_address_change_callback<F>(&mut self, callback: F)
1468 where
1469 F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1470 {
1471 self.address_change_callback = Some(Box::new(callback));
1472 }
1473
1474 pub fn clear_address_change_callback(&mut self) {
1476 self.address_change_callback = None;
1477 }
1478
1479 pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1484 AddressDiscoveryStats::default()
1486 }
1487
1488 pub fn open_connections(&self) -> usize {
1490 self.connections.len()
1491 }
1492
1493 pub fn incoming_buffer_bytes(&self) -> u64 {
1496 self.all_incoming_buffers_total_bytes
1497 }
1498
1499 #[cfg(test)]
1500 pub(crate) fn known_connections(&self) -> usize {
1501 let x = self.connections.len();
1502 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1503 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1505 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1507 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1508 x
1509 }
1510
1511 #[cfg(test)]
1512 pub(crate) fn known_cids(&self) -> usize {
1513 self.index.connection_ids.len()
1514 }
1515
1516 fn cids_exhausted(&self) -> bool {
1521 self.local_cid_generator.cid_len() <= 4
1522 && self.local_cid_generator.cid_len() != 0
1523 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1524 - self.index.connection_ids.len())
1525 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1526 }
1527}
1528
1529impl fmt::Debug for Endpoint {
1530 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1531 fmt.debug_struct("Endpoint")
1532 .field("rng", &self.rng)
1533 .field("index", &self.index)
1534 .field("connections", &self.connections)
1535 .field("config", &self.config)
1536 .field("server_config", &self.server_config)
1537 .field("incoming_buffers.len", &self.incoming_buffers.len())
1539 .field(
1540 "all_incoming_buffers_total_bytes",
1541 &self.all_incoming_buffers_total_bytes,
1542 )
1543 .finish()
1544 }
1545}
1546
1547#[derive(Default)]
1549struct IncomingBuffer {
1550 datagrams: Vec<DatagramConnectionEvent>,
1551 total_bytes: u64,
1552}
1553
1554#[derive(Copy, Clone, Debug)]
1556enum RouteDatagramTo {
1557 Incoming(usize),
1558 Connection(ConnectionHandle),
1559}
1560
1561#[derive(Default, Debug)]
1563struct ConnectionIndex {
1564 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1570 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1574 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1578 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1587 connection_reset_tokens: ResetTokenTable,
1592}
1593
1594impl ConnectionIndex {
1595 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1597 if dst_cid.is_empty() {
1598 return;
1599 }
1600 self.connection_ids_initial
1601 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1602 }
1603
1604 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1606 if dst_cid.is_empty() {
1607 return;
1608 }
1609 let removed = self.connection_ids_initial.remove(&dst_cid);
1610 debug_assert!(removed.is_some());
1611 }
1612
1613 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1615 if dst_cid.is_empty() {
1616 return;
1617 }
1618 self.connection_ids_initial
1619 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1620 }
1621
1622 fn insert_conn(
1625 &mut self,
1626 addresses: FourTuple,
1627 dst_cid: ConnectionId,
1628 connection: ConnectionHandle,
1629 side: Side,
1630 ) {
1631 match dst_cid.len() {
1632 0 => match side {
1633 Side::Server => {
1634 self.incoming_connection_remotes
1635 .insert(addresses, connection);
1636 }
1637 Side::Client => {
1638 self.outgoing_connection_remotes
1639 .insert(addresses.remote, connection);
1640 }
1641 },
1642 _ => {
1643 self.connection_ids.insert(dst_cid, connection);
1644 }
1645 }
1646 }
1647
1648 fn retire(&mut self, dst_cid: ConnectionId) {
1650 self.connection_ids.remove(&dst_cid);
1651 }
1652
1653 fn remove(&mut self, conn: &ConnectionMeta) {
1655 if conn.side.is_server() {
1656 self.remove_initial(conn.init_cid);
1657 }
1658 for cid in conn.loc_cids.values() {
1659 self.connection_ids.remove(cid);
1660 }
1661 self.incoming_connection_remotes.remove(&conn.addresses);
1662 self.outgoing_connection_remotes
1663 .remove(&conn.addresses.remote);
1664 if let Some((remote, token)) = conn.reset_token {
1665 self.connection_reset_tokens.remove(remote, token);
1666 }
1667 }
1668
1669 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1671 let dst_cid = datagram.dst_cid();
1672 let is_empty_cid = dst_cid.is_empty();
1673
1674 if !is_empty_cid {
1676 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1677 return Some(RouteDatagramTo::Connection(ch));
1678 }
1679 }
1680
1681 if datagram.is_initial() || datagram.is_0rtt() {
1683 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1684 return Some(ch);
1685 }
1686 }
1687
1688 if is_empty_cid {
1690 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1692 return Some(RouteDatagramTo::Connection(ch));
1693 }
1694 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1695 return Some(RouteDatagramTo::Connection(ch));
1696 }
1697 }
1698
1699 let data = datagram.data();
1701 if data.len() < RESET_TOKEN_SIZE {
1702 return None;
1703 }
1704 self.connection_reset_tokens
1705 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1706 .cloned()
1707 .map(RouteDatagramTo::Connection)
1708 }
1709}
1710
1711#[derive(Debug)]
1712pub(crate) struct ConnectionMeta {
1713 init_cid: ConnectionId,
1714 cids_issued: u64,
1716 loc_cids: FxHashMap<u64, ConnectionId>,
1717 addresses: FourTuple,
1722 side: Side,
1723 reset_token: Option<(SocketAddr, ResetToken)>,
1726 peer_id: Option<PeerId>,
1728}
1729
1730#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1732pub struct ConnectionHandle(pub usize);
1733
1734impl From<ConnectionHandle> for usize {
1735 fn from(x: ConnectionHandle) -> Self {
1736 x.0
1737 }
1738}
1739
1740impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1741 type Output = ConnectionMeta;
1742 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1743 &self[ch.0]
1744 }
1745}
1746
1747impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1748 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1749 &mut self[ch.0]
1750 }
1751}
1752
1753pub enum DatagramEvent {
1755 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1757 NewConnection(Incoming),
1759 Response(Transmit),
1761}
1762
1763pub struct Incoming {
1765 received_at: Instant,
1766 addresses: FourTuple,
1767 ecn: Option<EcnCodepoint>,
1768 packet: InitialPacket,
1769 rest: Option<BytesMut>,
1770 crypto: Keys,
1771 token: IncomingToken,
1772 incoming_idx: usize,
1773 improper_drop_warner: IncomingImproperDropWarner,
1774}
1775
1776impl Incoming {
1777 pub fn local_ip(&self) -> Option<IpAddr> {
1781 self.addresses.local_ip
1782 }
1783
1784 pub fn remote_address(&self) -> SocketAddr {
1786 self.addresses.remote
1787 }
1788
1789 pub fn remote_address_validated(&self) -> bool {
1797 self.token.validated
1798 }
1799
1800 pub fn may_retry(&self) -> bool {
1805 self.token.retry_src_cid.is_none()
1806 }
1807
1808 pub fn orig_dst_cid(&self) -> &ConnectionId {
1810 &self.token.orig_dst_cid
1811 }
1812}
1813
1814impl fmt::Debug for Incoming {
1815 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1816 f.debug_struct("Incoming")
1817 .field("addresses", &self.addresses)
1818 .field("ecn", &self.ecn)
1819 .field("token", &self.token)
1822 .field("incoming_idx", &self.incoming_idx)
1823 .finish_non_exhaustive()
1825 }
1826}
1827
1828struct IncomingImproperDropWarner;
1829
1830impl IncomingImproperDropWarner {
1831 fn dismiss(self) {
1832 mem::forget(self);
1833 }
1834}
1835
1836impl Drop for IncomingImproperDropWarner {
1837 fn drop(&mut self) {
1838 warn!(
1839 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1840 (may cause memory leak and eventual inability to accept new connections)"
1841 );
1842 }
1843}
1844
1845#[derive(Debug, Error, Clone, PartialEq, Eq)]
1849pub enum ConnectError {
1850 #[error("endpoint stopping")]
1854 EndpointStopping,
1855 #[error("CIDs exhausted")]
1859 CidsExhausted,
1860 #[error("invalid server name: {0}")]
1862 InvalidServerName(String),
1863 #[error("invalid remote address: {0}")]
1867 InvalidRemoteAddress(SocketAddr),
1868 #[error("no default client config")]
1872 NoDefaultClientConfig,
1873 #[error("unsupported QUIC version")]
1875 UnsupportedVersion,
1876}
1877
1878#[derive(Debug)]
1880pub struct AcceptError {
1881 pub cause: ConnectionError,
1883 pub response: Option<Transmit>,
1885}
1886
1887#[derive(Debug, Error)]
1889#[error("retry() with validated Incoming")]
1890pub struct RetryError(Box<Incoming>);
1891
1892impl RetryError {
1893 pub fn into_incoming(self) -> Incoming {
1895 *self.0
1896 }
1897}
1898
1899#[derive(Default, Debug)]
1904struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1905
1906impl ResetTokenTable {
1907 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1908 self.0
1909 .entry(remote)
1910 .or_default()
1911 .insert(token, ch)
1912 .is_some()
1913 }
1914
1915 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1916 use std::collections::hash_map::Entry;
1917 match self.0.entry(remote) {
1918 Entry::Vacant(_) => {}
1919 Entry::Occupied(mut e) => {
1920 e.get_mut().remove(&token);
1921 if e.get().is_empty() {
1922 e.remove_entry();
1923 }
1924 }
1925 }
1926 }
1927
1928 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1929 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1930 self.0.get(&remote)?.get(&token)
1931 }
1932}
1933
1934#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1939struct FourTuple {
1940 remote: SocketAddr,
1941 local_ip: Option<IpAddr>,
1943}