1use std::{
2 collections::{HashMap, VecDeque, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21 Side, Transmit, TransportConfig, TransportError,
22 cid_generator::ConnectionIdGenerator,
23 coding::BufMutExt,
24 config::{ClientConfig, EndpointConfig, ServerConfig},
25 connection::{Connection, ConnectionError, SideArgs},
26 crypto::{self, Keys, UnsupportedVersion},
27 frame,
28 nat_traversal_api::PeerId,
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31 PacketNumber, PartialDecode, ProtectedInitialHeader,
32 },
33 relay::RelayStatisticsCollector,
34 shared::{
35 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
36 EndpointEvent, EndpointEventInner, IssuedCid,
37 },
38 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
39 transport_parameters::{PreferredAddress, TransportParameters},
40};
41
42#[derive(Debug, Clone)]
44struct RelayQueueItem {
45 target_peer_id: PeerId,
47 frame: frame::PunchMeNow,
49 created_at: Instant,
51 attempts: u32,
53 last_attempt: Option<Instant>,
55}
56
57#[derive(Debug)]
59struct RelayQueue {
60 pending: IndexMap<u64, RelayQueueItem>,
62 next_seq: u64,
64 max_queue_size: usize,
66 request_timeout: Duration,
68 max_retries: u32,
70 retry_interval: Duration,
72 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
74 max_relays_per_peer: usize,
76 rate_limit_window: Duration,
78}
79
80#[derive(Debug, Default, Clone)]
82pub struct AddressDiscoveryStats {
83 pub frames_sent: u64,
85 pub frames_received: u64,
87 pub addresses_discovered: u64,
89 pub address_changes_detected: u64,
91}
92
93#[derive(Debug, Default, Clone)]
95pub struct RelayStats {
96 pub requests_received: u64,
98 pub requests_relayed: u64,
100 pub requests_failed: u64,
102 pub requests_dropped: u64,
104 pub requests_timed_out: u64,
106 pub requests_rate_limited: u64,
108 pub current_queue_size: usize,
110}
111
112impl RelayQueue {
113 fn new() -> Self {
115 Self {
116 pending: IndexMap::new(),
117 next_seq: 0,
118 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
121 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
123 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
126 }
127
128 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
130 if self.pending.len() >= self.max_queue_size {
132 warn!(
133 "Relay queue full, dropping request for peer {:?}",
134 target_peer_id
135 );
136 return false;
137 }
138
139 if !self.check_rate_limit(target_peer_id, now) {
141 warn!(
142 "Rate limit exceeded for peer {:?}, dropping relay request",
143 target_peer_id
144 );
145 return false;
146 }
147
148 let item = RelayQueueItem {
149 target_peer_id,
150 frame,
151 created_at: now,
152 attempts: 0,
153 last_attempt: None,
154 };
155
156 let seq = self.next_seq;
157 self.next_seq += 1;
158 self.pending.insert(seq, item);
159
160 self.record_relay_request(target_peer_id, now);
162
163 trace!(
164 "Queued relay request for peer {:?}, queue size: {}",
165 target_peer_id,
166 self.pending.len()
167 );
168 true
169 }
170
171 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
173 self.cleanup_rate_limiter(now);
175
176 if let Some(requests) = self.rate_limiter.get(&peer_id) {
178 requests.len() < self.max_relays_per_peer
179 } else {
180 true }
182 }
183
184 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
186 self.rate_limiter.entry(peer_id).or_default().push_back(now);
187 }
188
189 fn cleanup_rate_limiter(&mut self, now: Instant) {
191 self.rate_limiter.retain(|_, requests| {
192 requests.retain(|&request_time| {
193 now.saturating_duration_since(request_time) <= self.rate_limit_window
194 });
195 !requests.is_empty()
196 });
197 }
198
199 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
201 let mut expired_keys = Vec::new();
203 let mut ready_key = None;
204
205 for (seq, item) in &self.pending {
206 if now.saturating_duration_since(item.created_at) > self.request_timeout {
208 expired_keys.push(*seq);
209 continue;
210 }
211
212 if item.attempts == 0
214 || item
215 .last_attempt
216 .is_none_or(|last| now.saturating_duration_since(last) >= self.retry_interval)
217 {
218 ready_key = Some(*seq);
219 break;
220 }
221 }
222
223 for key in expired_keys {
225 if let Some(expired) = self.pending.shift_remove(&key) {
226 debug!(
227 "Relay request for peer {:?} timed out after {:?}",
228 expired.target_peer_id,
229 now.saturating_duration_since(expired.created_at)
230 );
231 }
232 }
233
234 if let Some(key) = ready_key {
236 if let Some(mut item) = self.pending.shift_remove(&key) {
237 item.attempts += 1;
238 item.last_attempt = Some(now);
239 return Some(item);
240 }
241 }
242
243 None
244 }
245
246 fn requeue_failed(&mut self, item: RelayQueueItem) {
248 if item.attempts < self.max_retries {
249 trace!(
250 "Requeuing failed relay request for peer {:?}, attempt {}/{}",
251 item.target_peer_id, item.attempts, self.max_retries
252 );
253 let seq = self.next_seq;
254 self.next_seq += 1;
255 self.pending.insert(seq, item);
256 } else {
257 debug!(
258 "Dropping relay request for peer {:?} after {} failed attempts",
259 item.target_peer_id, item.attempts
260 );
261 }
262 }
263
264 fn cleanup_expired(&mut self, now: Instant) -> usize {
266 let initial_len = self.pending.len();
267
268 let expired_keys: Vec<u64> = self
270 .pending
271 .iter()
272 .filter_map(|(seq, item)| {
273 if now.saturating_duration_since(item.created_at) > self.request_timeout {
274 Some(*seq)
275 } else {
276 None
277 }
278 })
279 .collect();
280
281 for key in expired_keys {
283 if let Some(expired) = self.pending.shift_remove(&key) {
284 debug!(
285 "Removing expired relay request for peer {:?}",
286 expired.target_peer_id
287 );
288 }
289 }
290
291 initial_len - self.pending.len()
292 }
293
294 fn len(&self) -> usize {
296 self.pending.len()
297 }
298}
299
300pub struct Endpoint {
305 rng: StdRng,
306 index: ConnectionIndex,
307 connections: Slab<ConnectionMeta>,
308 local_cid_generator: Box<dyn ConnectionIdGenerator>,
309 config: Arc<EndpointConfig>,
310 server_config: Option<Arc<ServerConfig>>,
311 allow_mtud: bool,
313 last_stateless_reset: Option<Instant>,
315 incoming_buffers: Slab<IncomingBuffer>,
317 all_incoming_buffers_total_bytes: u64,
318 peer_connections: HashMap<PeerId, ConnectionHandle>,
320 relay_queue: RelayQueue,
322 relay_stats: RelayStats,
324 relay_stats_collector: RelayStatisticsCollector,
326 address_discovery_enabled: bool,
328 address_change_callback: Option<Box<dyn Fn(Option<SocketAddr>, SocketAddr) + Send + Sync>>,
330}
331
332impl Endpoint {
333 pub fn new(
344 config: Arc<EndpointConfig>,
345 server_config: Option<Arc<ServerConfig>>,
346 allow_mtud: bool,
347 rng_seed: Option<[u8; 32]>,
348 ) -> Self {
349 let rng_seed = rng_seed.or(config.rng_seed);
350 Self {
351 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
352 index: ConnectionIndex::default(),
353 connections: Slab::new(),
354 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
355 config,
356 server_config,
357 allow_mtud,
358 last_stateless_reset: None,
359 incoming_buffers: Slab::new(),
360 all_incoming_buffers_total_bytes: 0,
361 peer_connections: HashMap::new(),
362 relay_queue: RelayQueue::new(),
363 relay_stats: RelayStats::default(),
364 relay_stats_collector: RelayStatisticsCollector::new(),
365 address_discovery_enabled: true, address_change_callback: None,
367 }
368 }
369
370 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
372 self.server_config = server_config;
373 }
374
375 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
377 self.peer_connections.insert(peer_id, connection_handle);
378 trace!(
379 "Registered peer {:?} with connection {:?}",
380 peer_id, connection_handle
381 );
382 }
383
384 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
386 if let Some(handle) = self.peer_connections.remove(peer_id) {
387 trace!(
388 "Unregistered peer {:?} from connection {:?}",
389 peer_id, handle
390 );
391 }
392 }
393
394 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
396 self.peer_connections.get(peer_id).copied()
397 }
398
399 pub(crate) fn queue_frame_for_peer(
401 &mut self,
402 peer_id: &PeerId,
403 frame: frame::PunchMeNow,
404 ) -> bool {
405 self.relay_stats.requests_received += 1;
406
407 if let Some(ch) = self.lookup_peer_connection(peer_id) {
408 if self.relay_frame_to_connection(ch, frame.clone()) {
410 self.relay_stats.requests_relayed += 1;
411 self.relay_stats_collector.record_rate_limit(true);
413 trace!(
414 "Immediately relayed frame to peer {:?} via connection {:?}",
415 peer_id, ch
416 );
417 return true;
418 }
419 }
420
421 let now = Instant::now();
423 if self.relay_queue.enqueue(*peer_id, frame, now) {
424 self.relay_stats.current_queue_size = self.relay_queue.len();
425 self.relay_stats_collector.record_rate_limit(true);
427 trace!("Queued relay request for peer {:?}", peer_id);
428 true
429 } else {
430 if !self.relay_queue.check_rate_limit(*peer_id, now) {
432 self.relay_stats.requests_rate_limited += 1;
433 self.relay_stats_collector.record_rate_limit(false);
435 self.relay_stats_collector.record_error("rate_limited");
437 } else {
438 self.relay_stats.requests_dropped += 1;
439 self.relay_stats_collector
441 .record_error("resource_exhausted");
442 }
443 false
444 }
445 }
446
447 fn relay_frame_to_connection(
449 &mut self,
450 ch: ConnectionHandle,
451 frame: frame::PunchMeNow,
452 ) -> bool {
453 let _event = ConnectionEvent(ConnectionEventInner::QueuePunchMeNow(frame));
455 if let Some(_conn) = self.connections.get_mut(ch.0) {
456 }
463 true
467 }
468
469 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
471 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
472 connection.peer_id = Some(peer_id);
473 self.register_peer(peer_id, connection_handle);
474
475 self.process_queued_relays_for_peer(peer_id);
477 }
478 }
479
480 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
482 let _now = Instant::now();
483 let mut processed = 0;
484
485 let mut items_to_process = Vec::new();
487 let mut keys_to_remove = Vec::new();
488
489 for (seq, item) in &self.relay_queue.pending {
491 if item.target_peer_id == peer_id {
492 items_to_process.push(item.clone());
493 keys_to_remove.push(*seq);
494 }
495 }
496
497 for key in keys_to_remove {
499 self.relay_queue.pending.shift_remove(&key);
500 }
501
502 for item in items_to_process {
504 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
505 if self.relay_frame_to_connection(ch, item.frame.clone()) {
506 self.relay_stats.requests_relayed += 1;
507 processed += 1;
508 trace!("Processed queued relay for peer {:?}", peer_id);
509 } else {
510 self.relay_queue.requeue_failed(item);
512 self.relay_stats.requests_failed += 1;
513 }
514 }
515 }
516
517 self.relay_stats.current_queue_size = self.relay_queue.len();
518
519 if processed > 0 {
520 debug!(
521 "Processed {} queued relay requests for peer {:?}",
522 processed, peer_id
523 );
524 }
525 }
526
527 pub fn process_relay_queue(&mut self) {
529 let now = Instant::now();
530 let mut processed = 0;
531 let mut failed = 0;
532
533 while let Some(item) = self.relay_queue.next_ready(now) {
535 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
536 if self.relay_frame_to_connection(ch, item.frame.clone()) {
537 self.relay_stats.requests_relayed += 1;
538 processed += 1;
539 trace!(
540 "Successfully relayed frame to peer {:?}",
541 item.target_peer_id
542 );
543 } else {
544 self.relay_queue.requeue_failed(item);
546 self.relay_stats.requests_failed += 1;
547 self.relay_stats_collector
549 .record_error("connection_failure");
550 failed += 1;
551 }
552 } else {
553 self.relay_queue.requeue_failed(item);
555 self.relay_stats_collector.record_error("peer_not_found");
557 failed += 1;
558 }
559 }
560
561 let expired = self.relay_queue.cleanup_expired(now);
563 if expired > 0 {
564 self.relay_stats.requests_timed_out += expired as u64;
565 for _ in 0..expired {
567 self.relay_stats_collector.record_error("request_timeout");
568 }
569 debug!("Cleaned up {} expired relay requests", expired);
570 }
571
572 self.relay_stats.current_queue_size = self.relay_queue.len();
573
574 if processed > 0 || failed > 0 {
575 trace!(
576 "Relay queue processing: {} processed, {} failed, {} in queue",
577 processed,
578 failed,
579 self.relay_queue.len()
580 );
581 }
582 }
583
584 pub fn relay_stats(&self) -> &RelayStats {
586 &self.relay_stats
587 }
588
589 pub fn comprehensive_relay_stats(&self) -> crate::relay::RelayStatistics {
591 self.relay_stats_collector
593 .update_queue_stats(&self.relay_stats);
594 self.relay_stats_collector.collect_statistics()
595 }
596
597 pub fn relay_stats_collector(&self) -> &RelayStatisticsCollector {
599 &self.relay_stats_collector
600 }
601
602 pub fn relay_queue_len(&self) -> usize {
604 self.relay_queue.len()
605 }
606
607 pub fn handle_event(
611 &mut self,
612 ch: ConnectionHandle,
613 event: EndpointEvent,
614 ) -> Option<ConnectionEvent> {
615 use EndpointEventInner::*;
616 match event.0 {
617 EndpointEventInner::NeedIdentifiers(now, n) => {
618 return Some(self.send_new_identifiers(now, ch, n));
619 }
620 ResetToken(remote, token) => {
621 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
622 self.index.connection_reset_tokens.remove(old.0, old.1);
623 }
624 if self.index.connection_reset_tokens.insert(remote, token, ch) {
625 warn!("duplicate reset token");
626 }
627 }
628 RetireConnectionId(now, seq, allow_more_cids) => {
629 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
630 trace!("peer retired CID {}: {}", seq, cid);
631 self.index.retire(cid);
632 if allow_more_cids {
633 return Some(self.send_new_identifiers(now, ch, 1));
634 }
635 }
636 }
637 RelayPunchMeNow(target_peer_id, punch_me_now) => {
638 let peer_id = PeerId(target_peer_id);
640 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
641 trace!(
642 "Successfully queued PunchMeNow frame for relay to peer {:?}",
643 peer_id
644 );
645 } else {
646 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
647 }
648 }
649 SendAddressFrame(add_address_frame) => {
650 return Some(ConnectionEvent(ConnectionEventInner::QueueAddAddress(
652 add_address_frame,
653 )));
654 }
655 NatCandidateValidated { address, challenge } => {
656 trace!(
658 "NAT candidate validation succeeded for {} with challenge {:016x}",
659 address, challenge
660 );
661
662 debug!("NAT candidate {} validated successfully", address);
666 }
667 Drained => {
668 if let Some(conn) = self.connections.try_remove(ch.0) {
669 self.index.remove(&conn);
670 if let Some(peer_id) = conn.peer_id {
672 self.peer_connections.remove(&peer_id);
673 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
674 }
675 } else {
676 error!(id = ch.0, "unknown connection drained");
680 }
681 }
682 }
683 None
684 }
685
686 pub fn handle(
688 &mut self,
689 now: Instant,
690 remote: SocketAddr,
691 local_ip: Option<IpAddr>,
692 ecn: Option<EcnCodepoint>,
693 data: BytesMut,
694 buf: &mut Vec<u8>,
695 ) -> Option<DatagramEvent> {
696 let datagram_len = data.len();
698 let event = match PartialDecode::new(
699 data,
700 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
701 &self.config.supported_versions,
702 self.config.grease_quic_bit,
703 ) {
704 Ok((first_decode, remaining)) => DatagramConnectionEvent {
705 now,
706 remote,
707 ecn,
708 first_decode,
709 remaining,
710 },
711 Err(PacketDecodeError::UnsupportedVersion {
712 src_cid,
713 dst_cid,
714 version,
715 }) => {
716 if self.server_config.is_none() {
717 debug!("dropping packet with unsupported version");
718 return None;
719 }
720 trace!("sending version negotiation");
721 Header::VersionNegotiate {
723 random: self.rng.r#gen::<u8>() | 0x40,
724 src_cid: dst_cid,
725 dst_cid: src_cid,
726 }
727 .encode(buf);
728 buf.write::<u32>(match version {
730 0x0a1a_2a3a => 0x0a1a_2a4a,
731 _ => 0x0a1a_2a3a,
732 });
733 for &version in &self.config.supported_versions {
734 buf.write(version);
735 }
736 return Some(DatagramEvent::Response(Transmit {
737 destination: remote,
738 ecn: None,
739 size: buf.len(),
740 segment_size: None,
741 src_ip: local_ip,
742 }));
743 }
744 Err(e) => {
745 trace!("malformed header: {}", e);
746 return None;
747 }
748 };
749
750 let addresses = FourTuple { remote, local_ip };
751 let dst_cid = event.first_decode.dst_cid();
752
753 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
754 match route_to {
756 RouteDatagramTo::Incoming(incoming_idx) => {
757 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
758 let config = &self.server_config.as_ref().unwrap();
759
760 if incoming_buffer
761 .total_bytes
762 .checked_add(datagram_len as u64)
763 .is_some_and(|n| n <= config.incoming_buffer_size)
764 && self
765 .all_incoming_buffers_total_bytes
766 .checked_add(datagram_len as u64)
767 .is_some_and(|n| n <= config.incoming_buffer_size_total)
768 {
769 incoming_buffer.datagrams.push(event);
770 incoming_buffer.total_bytes += datagram_len as u64;
771 self.all_incoming_buffers_total_bytes += datagram_len as u64;
772 }
773
774 None
775 }
776 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
777 ch,
778 ConnectionEvent(ConnectionEventInner::Datagram(event)),
779 )),
780 }
781 } else if event.first_decode.initial_header().is_some() {
782 self.handle_first_packet(datagram_len, event, addresses, buf)
785 } else if event.first_decode.has_long_header() {
786 debug!(
787 "ignoring non-initial packet for unknown connection {}",
788 dst_cid
789 );
790 None
791 } else if !event.first_decode.is_initial()
792 && self.local_cid_generator.validate(dst_cid).is_err()
793 {
794 debug!("dropping packet with invalid CID");
798 None
799 } else if dst_cid.is_empty() {
800 trace!("dropping unrecognized short packet without ID");
801 None
802 } else {
803 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
804 .map(DatagramEvent::Response)
805 }
806 }
807
808 fn stateless_reset(
809 &mut self,
810 now: Instant,
811 inciting_dgram_len: usize,
812 addresses: FourTuple,
813 dst_cid: ConnectionId,
814 buf: &mut Vec<u8>,
815 ) -> Option<Transmit> {
816 if self
817 .last_stateless_reset
818 .is_some_and(|last| last + self.config.min_reset_interval > now)
819 {
820 debug!("ignoring unexpected packet within minimum stateless reset interval");
821 return None;
822 }
823
824 const MIN_PADDING_LEN: usize = 5;
826
827 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
830 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
831 _ => {
832 debug!(
833 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
834 inciting_dgram_len
835 );
836 return None;
837 }
838 };
839
840 debug!(
841 "sending stateless reset for {} to {}",
842 dst_cid, addresses.remote
843 );
844 self.last_stateless_reset = Some(now);
845 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
847 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
848 max_padding_len
849 } else {
850 self.rng.gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
851 };
852 buf.reserve(padding_len + RESET_TOKEN_SIZE);
853 buf.resize(padding_len, 0);
854 self.rng.fill_bytes(&mut buf[0..padding_len]);
855 buf[0] = 0b0100_0000 | (buf[0] >> 2);
856 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
857
858 debug_assert!(buf.len() < inciting_dgram_len);
859
860 Some(Transmit {
861 destination: addresses.remote,
862 ecn: None,
863 size: buf.len(),
864 segment_size: None,
865 src_ip: addresses.local_ip,
866 })
867 }
868
869 pub fn connect(
871 &mut self,
872 now: Instant,
873 config: ClientConfig,
874 remote: SocketAddr,
875 server_name: &str,
876 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
877 if self.cids_exhausted() {
878 return Err(ConnectError::CidsExhausted);
879 }
880 if remote.port() == 0 || remote.ip().is_unspecified() {
881 return Err(ConnectError::InvalidRemoteAddress(remote));
882 }
883 if !self.config.supported_versions.contains(&config.version) {
884 return Err(ConnectError::UnsupportedVersion);
885 }
886
887 let remote_id = (config.initial_dst_cid_provider)();
888 trace!(initial_dcid = %remote_id);
889
890 let ch = ConnectionHandle(self.connections.vacant_key());
891 let loc_cid = self.new_cid(ch);
892 let params = TransportParameters::new(
893 &config.transport,
894 &self.config,
895 self.local_cid_generator.as_ref(),
896 loc_cid,
897 None,
898 &mut self.rng,
899 );
900 let tls = config
901 .crypto
902 .start_session(config.version, server_name, ¶ms)?;
903
904 let conn = self.add_connection(
905 ch,
906 config.version,
907 remote_id,
908 loc_cid,
909 remote_id,
910 FourTuple {
911 remote,
912 local_ip: None,
913 },
914 now,
915 tls,
916 config.transport,
917 SideArgs::Client {
918 token_store: config.token_store,
919 server_name: server_name.into(),
920 },
921 );
922 Ok((ch, conn))
923 }
924
925 fn send_new_identifiers(
926 &mut self,
927 now: Instant,
928 ch: ConnectionHandle,
929 num: u64,
930 ) -> ConnectionEvent {
931 let mut ids = vec![];
932 for _ in 0..num {
933 let id = self.new_cid(ch);
934 let meta = &mut self.connections[ch];
935 let sequence = meta.cids_issued;
936 meta.cids_issued += 1;
937 meta.loc_cids.insert(sequence, id);
938 ids.push(IssuedCid {
939 sequence,
940 id,
941 reset_token: ResetToken::new(&*self.config.reset_key, id),
942 });
943 }
944 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
945 }
946
947 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
949 loop {
950 let cid = self.local_cid_generator.generate_cid();
951 if cid.is_empty() {
952 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
954 return cid;
955 }
956 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
957 e.insert(ch);
958 break cid;
959 }
960 }
961 }
962
963 fn handle_first_packet(
964 &mut self,
965 datagram_len: usize,
966 event: DatagramConnectionEvent,
967 addresses: FourTuple,
968 buf: &mut Vec<u8>,
969 ) -> Option<DatagramEvent> {
970 let dst_cid = event.first_decode.dst_cid();
971 let header = event.first_decode.initial_header().unwrap();
972
973 let Some(server_config) = &self.server_config else {
974 debug!("packet for unrecognized connection {}", dst_cid);
975 return self
976 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
977 .map(DatagramEvent::Response);
978 };
979
980 if datagram_len < MIN_INITIAL_SIZE as usize {
981 debug!("ignoring short initial for connection {}", dst_cid);
982 return None;
983 }
984
985 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
986 Ok(keys) => keys,
987 Err(UnsupportedVersion) => {
988 debug!(
991 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
992 header.version
993 );
994 return None;
995 }
996 };
997
998 if let Err(reason) = self.early_validate_first_packet(header) {
999 return Some(DatagramEvent::Response(self.initial_close(
1000 header.version,
1001 addresses,
1002 &crypto,
1003 &header.src_cid,
1004 reason,
1005 buf,
1006 )));
1007 }
1008
1009 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
1010 Ok(packet) => packet,
1011 Err(e) => {
1012 trace!("unable to decode initial packet: {}", e);
1013 return None;
1014 }
1015 };
1016
1017 if !packet.reserved_bits_valid() {
1018 debug!("dropping connection attempt with invalid reserved bits");
1019 return None;
1020 }
1021
1022 let Header::Initial(header) = packet.header else {
1023 panic!("non-initial packet in handle_first_packet()");
1024 };
1025
1026 let server_config = self.server_config.as_ref().unwrap().clone();
1027
1028 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
1029 Ok(token) => token,
1030 Err(InvalidRetryTokenError) => {
1031 debug!("rejecting invalid retry token");
1032 return Some(DatagramEvent::Response(self.initial_close(
1033 header.version,
1034 addresses,
1035 &crypto,
1036 &header.src_cid,
1037 TransportError::INVALID_TOKEN(""),
1038 buf,
1039 )));
1040 }
1041 };
1042
1043 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
1044 self.index
1045 .insert_initial_incoming(header.dst_cid, incoming_idx);
1046
1047 Some(DatagramEvent::NewConnection(Incoming {
1048 received_at: event.now,
1049 addresses,
1050 ecn: event.ecn,
1051 packet: InitialPacket {
1052 header,
1053 header_data: packet.header_data,
1054 payload: packet.payload,
1055 },
1056 rest: event.remaining,
1057 crypto,
1058 token,
1059 incoming_idx,
1060 improper_drop_warner: IncomingImproperDropWarner,
1061 }))
1062 }
1063
1064 #[allow(clippy::result_large_err)]
1067 pub fn accept(
1068 &mut self,
1069 mut incoming: Incoming,
1070 now: Instant,
1071 buf: &mut Vec<u8>,
1072 server_config: Option<Arc<ServerConfig>>,
1073 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
1074 let remote_address_validated = incoming.remote_address_validated();
1075 incoming.improper_drop_warner.dismiss();
1076 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1077 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1078
1079 let packet_number = incoming.packet.header.number.expand(0);
1080 let InitialHeader {
1081 src_cid,
1082 dst_cid,
1083 version,
1084 ..
1085 } = incoming.packet.header;
1086 let server_config =
1087 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
1088
1089 if server_config
1090 .transport
1091 .max_idle_timeout
1092 .is_some_and(|timeout| {
1093 incoming.received_at + Duration::from_millis(timeout.into()) <= now
1094 })
1095 {
1096 debug!("abandoning accept of stale initial");
1097 self.index.remove_initial(dst_cid);
1098 return Err(AcceptError {
1099 cause: ConnectionError::TimedOut,
1100 response: None,
1101 });
1102 }
1103
1104 if self.cids_exhausted() {
1105 debug!("refusing connection");
1106 self.index.remove_initial(dst_cid);
1107 return Err(AcceptError {
1108 cause: ConnectionError::CidsExhausted,
1109 response: Some(self.initial_close(
1110 version,
1111 incoming.addresses,
1112 &incoming.crypto,
1113 &src_cid,
1114 TransportError::CONNECTION_REFUSED(""),
1115 buf,
1116 )),
1117 });
1118 }
1119
1120 if incoming
1121 .crypto
1122 .packet
1123 .remote
1124 .decrypt(
1125 packet_number,
1126 &incoming.packet.header_data,
1127 &mut incoming.packet.payload,
1128 )
1129 .is_err()
1130 {
1131 debug!(packet_number, "failed to authenticate initial packet");
1132 self.index.remove_initial(dst_cid);
1133 return Err(AcceptError {
1134 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1135 response: None,
1136 });
1137 };
1138
1139 let ch = ConnectionHandle(self.connections.vacant_key());
1140 let loc_cid = self.new_cid(ch);
1141 let mut params = TransportParameters::new(
1142 &server_config.transport,
1143 &self.config,
1144 self.local_cid_generator.as_ref(),
1145 loc_cid,
1146 Some(&server_config),
1147 &mut self.rng,
1148 );
1149 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1150 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1151 params.retry_src_cid = incoming.token.retry_src_cid;
1152 let mut pref_addr_cid = None;
1153 if server_config.has_preferred_address() {
1154 let cid = self.new_cid(ch);
1155 pref_addr_cid = Some(cid);
1156 params.preferred_address = Some(PreferredAddress {
1157 address_v4: server_config.preferred_address_v4,
1158 address_v6: server_config.preferred_address_v6,
1159 connection_id: cid,
1160 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1161 });
1162 }
1163
1164 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1165 let transport_config = server_config.transport.clone();
1166 let mut conn = self.add_connection(
1167 ch,
1168 version,
1169 dst_cid,
1170 loc_cid,
1171 src_cid,
1172 incoming.addresses,
1173 incoming.received_at,
1174 tls,
1175 transport_config,
1176 SideArgs::Server {
1177 server_config,
1178 pref_addr_cid,
1179 path_validated: remote_address_validated,
1180 },
1181 );
1182 self.index.insert_initial(dst_cid, ch);
1183
1184 match conn.handle_first_packet(
1185 incoming.received_at,
1186 incoming.addresses.remote,
1187 incoming.ecn,
1188 packet_number,
1189 incoming.packet,
1190 incoming.rest,
1191 ) {
1192 Ok(()) => {
1193 trace!(id = ch.0, icid = %dst_cid, "new connection");
1194
1195 for event in incoming_buffer.datagrams {
1196 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1197 }
1198
1199 Ok((ch, conn))
1200 }
1201 Err(e) => {
1202 debug!("handshake failed: {}", e);
1203 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1204 let response = match e {
1205 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1206 version,
1207 incoming.addresses,
1208 &incoming.crypto,
1209 &src_cid,
1210 e.clone(),
1211 buf,
1212 )),
1213 _ => None,
1214 };
1215 Err(AcceptError { cause: e, response })
1216 }
1217 }
1218 }
1219
1220 fn early_validate_first_packet(
1222 &mut self,
1223 header: &ProtectedInitialHeader,
1224 ) -> Result<(), TransportError> {
1225 let config = &self.server_config.as_ref().unwrap();
1226 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1227 return Err(TransportError::CONNECTION_REFUSED(""));
1228 }
1229
1230 if header.dst_cid.len() < 8
1235 && (header.token_pos.is_empty()
1236 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1237 {
1238 debug!(
1239 "rejecting connection due to invalid DCID length {}",
1240 header.dst_cid.len()
1241 );
1242 return Err(TransportError::PROTOCOL_VIOLATION(
1243 "invalid destination CID length",
1244 ));
1245 }
1246
1247 Ok(())
1248 }
1249
1250 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1252 self.clean_up_incoming(&incoming);
1253 incoming.improper_drop_warner.dismiss();
1254
1255 self.initial_close(
1256 incoming.packet.header.version,
1257 incoming.addresses,
1258 &incoming.crypto,
1259 &incoming.packet.header.src_cid,
1260 TransportError::CONNECTION_REFUSED(""),
1261 buf,
1262 )
1263 }
1264
1265 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1269 if !incoming.may_retry() {
1270 return Err(RetryError(Box::new(incoming)));
1271 }
1272
1273 self.clean_up_incoming(&incoming);
1274 incoming.improper_drop_warner.dismiss();
1275
1276 let server_config = self.server_config.as_ref().unwrap();
1277
1278 let loc_cid = self.local_cid_generator.generate_cid();
1285
1286 let payload = TokenPayload::Retry {
1287 address: incoming.addresses.remote,
1288 orig_dst_cid: incoming.packet.header.dst_cid,
1289 issued: server_config.time_source.now(),
1290 };
1291 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1292
1293 let header = Header::Retry {
1294 src_cid: loc_cid,
1295 dst_cid: incoming.packet.header.src_cid,
1296 version: incoming.packet.header.version,
1297 };
1298
1299 let encode = header.encode(buf);
1300 buf.put_slice(&token);
1301 buf.extend_from_slice(&server_config.crypto.retry_tag(
1302 incoming.packet.header.version,
1303 &incoming.packet.header.dst_cid,
1304 buf,
1305 ));
1306 encode.finish(buf, &*incoming.crypto.header.local, None);
1307
1308 Ok(Transmit {
1309 destination: incoming.addresses.remote,
1310 ecn: None,
1311 size: buf.len(),
1312 segment_size: None,
1313 src_ip: incoming.addresses.local_ip,
1314 })
1315 }
1316
1317 pub fn ignore(&mut self, incoming: Incoming) {
1322 self.clean_up_incoming(&incoming);
1323 incoming.improper_drop_warner.dismiss();
1324 }
1325
1326 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1328 self.index.remove_initial(incoming.packet.header.dst_cid);
1329 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1330 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1331 }
1332
1333 fn add_connection(
1334 &mut self,
1335 ch: ConnectionHandle,
1336 version: u32,
1337 init_cid: ConnectionId,
1338 loc_cid: ConnectionId,
1339 rem_cid: ConnectionId,
1340 addresses: FourTuple,
1341 now: Instant,
1342 tls: Box<dyn crypto::Session>,
1343 transport_config: Arc<TransportConfig>,
1344 side_args: SideArgs,
1345 ) -> Connection {
1346 let mut rng_seed = [0; 32];
1347 self.rng.fill_bytes(&mut rng_seed);
1348 let side = side_args.side();
1349 let pref_addr_cid = side_args.pref_addr_cid();
1350 let conn = Connection::new(
1351 self.config.clone(),
1352 transport_config,
1353 init_cid,
1354 loc_cid,
1355 rem_cid,
1356 addresses.remote,
1357 addresses.local_ip,
1358 tls,
1359 self.local_cid_generator.as_ref(),
1360 now,
1361 version,
1362 self.allow_mtud,
1363 rng_seed,
1364 side_args,
1365 );
1366
1367 let mut cids_issued = 0;
1368 let mut loc_cids = FxHashMap::default();
1369
1370 loc_cids.insert(cids_issued, loc_cid);
1371 cids_issued += 1;
1372
1373 if let Some(cid) = pref_addr_cid {
1374 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1375 loc_cids.insert(cids_issued, cid);
1376 cids_issued += 1;
1377 }
1378
1379 let id = self.connections.insert(ConnectionMeta {
1380 init_cid,
1381 cids_issued,
1382 loc_cids,
1383 addresses,
1384 side,
1385 reset_token: None,
1386 peer_id: None,
1387 });
1388 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1389
1390 self.index.insert_conn(addresses, loc_cid, ch, side);
1391
1392 conn
1393 }
1394
1395 fn initial_close(
1396 &mut self,
1397 version: u32,
1398 addresses: FourTuple,
1399 crypto: &Keys,
1400 remote_id: &ConnectionId,
1401 reason: TransportError,
1402 buf: &mut Vec<u8>,
1403 ) -> Transmit {
1404 let local_id = self.local_cid_generator.generate_cid();
1408 let number = PacketNumber::U8(0);
1409 let header = Header::Initial(InitialHeader {
1410 dst_cid: *remote_id,
1411 src_cid: local_id,
1412 number,
1413 token: Bytes::new(),
1414 version,
1415 });
1416
1417 let partial_encode = header.encode(buf);
1418 let max_len =
1419 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1420 frame::Close::from(reason).encode(buf, max_len);
1421 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1422 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1423 Transmit {
1424 destination: addresses.remote,
1425 ecn: None,
1426 size: buf.len(),
1427 segment_size: None,
1428 src_ip: addresses.local_ip,
1429 }
1430 }
1431
1432 pub fn config(&self) -> &EndpointConfig {
1434 &self.config
1435 }
1436
1437 pub fn enable_address_discovery(&mut self, enabled: bool) {
1444 self.address_discovery_enabled = enabled;
1445 }
1448
1449 pub fn address_discovery_enabled(&self) -> bool {
1451 self.address_discovery_enabled
1452 }
1453
1454 pub fn discovered_addresses(&self) -> Vec<SocketAddr> {
1462 Vec::new()
1464 }
1465
1466 pub fn set_address_change_callback<F>(&mut self, callback: F)
1471 where
1472 F: Fn(Option<SocketAddr>, SocketAddr) + Send + Sync + 'static,
1473 {
1474 self.address_change_callback = Some(Box::new(callback));
1475 }
1476
1477 pub fn clear_address_change_callback(&mut self) {
1479 self.address_change_callback = None;
1480 }
1481
1482 pub fn address_discovery_stats(&self) -> AddressDiscoveryStats {
1487 AddressDiscoveryStats::default()
1489 }
1490
1491 pub fn open_connections(&self) -> usize {
1493 self.connections.len()
1494 }
1495
1496 pub fn incoming_buffer_bytes(&self) -> u64 {
1499 self.all_incoming_buffers_total_bytes
1500 }
1501
1502 #[cfg(test)]
1503 pub(crate) fn known_connections(&self) -> usize {
1504 let x = self.connections.len();
1505 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1506 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1508 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1510 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1511 x
1512 }
1513
1514 #[cfg(test)]
1515 pub(crate) fn known_cids(&self) -> usize {
1516 self.index.connection_ids.len()
1517 }
1518
1519 fn cids_exhausted(&self) -> bool {
1524 self.local_cid_generator.cid_len() <= 4
1525 && self.local_cid_generator.cid_len() != 0
1526 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1527 - self.index.connection_ids.len())
1528 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1529 }
1530}
1531
1532impl fmt::Debug for Endpoint {
1533 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1534 fmt.debug_struct("Endpoint")
1535 .field("rng", &self.rng)
1536 .field("index", &self.index)
1537 .field("connections", &self.connections)
1538 .field("config", &self.config)
1539 .field("server_config", &self.server_config)
1540 .field("incoming_buffers.len", &self.incoming_buffers.len())
1542 .field(
1543 "all_incoming_buffers_total_bytes",
1544 &self.all_incoming_buffers_total_bytes,
1545 )
1546 .finish()
1547 }
1548}
1549
1550#[derive(Default)]
1552struct IncomingBuffer {
1553 datagrams: Vec<DatagramConnectionEvent>,
1554 total_bytes: u64,
1555}
1556
1557#[derive(Copy, Clone, Debug)]
1559enum RouteDatagramTo {
1560 Incoming(usize),
1561 Connection(ConnectionHandle),
1562}
1563
1564#[derive(Default, Debug)]
1566struct ConnectionIndex {
1567 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1573 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1577 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1581 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1590 connection_reset_tokens: ResetTokenTable,
1595}
1596
1597impl ConnectionIndex {
1598 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1600 if dst_cid.is_empty() {
1601 return;
1602 }
1603 self.connection_ids_initial
1604 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1605 }
1606
1607 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1609 if dst_cid.is_empty() {
1610 return;
1611 }
1612 let removed = self.connection_ids_initial.remove(&dst_cid);
1613 debug_assert!(removed.is_some());
1614 }
1615
1616 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1618 if dst_cid.is_empty() {
1619 return;
1620 }
1621 self.connection_ids_initial
1622 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1623 }
1624
1625 fn insert_conn(
1628 &mut self,
1629 addresses: FourTuple,
1630 dst_cid: ConnectionId,
1631 connection: ConnectionHandle,
1632 side: Side,
1633 ) {
1634 match dst_cid.len() {
1635 0 => match side {
1636 Side::Server => {
1637 self.incoming_connection_remotes
1638 .insert(addresses, connection);
1639 }
1640 Side::Client => {
1641 self.outgoing_connection_remotes
1642 .insert(addresses.remote, connection);
1643 }
1644 },
1645 _ => {
1646 self.connection_ids.insert(dst_cid, connection);
1647 }
1648 }
1649 }
1650
1651 fn retire(&mut self, dst_cid: ConnectionId) {
1653 self.connection_ids.remove(&dst_cid);
1654 }
1655
1656 fn remove(&mut self, conn: &ConnectionMeta) {
1658 if conn.side.is_server() {
1659 self.remove_initial(conn.init_cid);
1660 }
1661 for cid in conn.loc_cids.values() {
1662 self.connection_ids.remove(cid);
1663 }
1664 self.incoming_connection_remotes.remove(&conn.addresses);
1665 self.outgoing_connection_remotes
1666 .remove(&conn.addresses.remote);
1667 if let Some((remote, token)) = conn.reset_token {
1668 self.connection_reset_tokens.remove(remote, token);
1669 }
1670 }
1671
1672 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1674 let dst_cid = datagram.dst_cid();
1675 let is_empty_cid = dst_cid.is_empty();
1676
1677 if !is_empty_cid {
1679 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1680 return Some(RouteDatagramTo::Connection(ch));
1681 }
1682 }
1683
1684 if datagram.is_initial() || datagram.is_0rtt() {
1686 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1687 return Some(ch);
1688 }
1689 }
1690
1691 if is_empty_cid {
1693 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1695 return Some(RouteDatagramTo::Connection(ch));
1696 }
1697 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1698 return Some(RouteDatagramTo::Connection(ch));
1699 }
1700 }
1701
1702 let data = datagram.data();
1704 if data.len() < RESET_TOKEN_SIZE {
1705 return None;
1706 }
1707 self.connection_reset_tokens
1708 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1709 .cloned()
1710 .map(RouteDatagramTo::Connection)
1711 }
1712}
1713
1714#[derive(Debug)]
1715pub(crate) struct ConnectionMeta {
1716 init_cid: ConnectionId,
1717 cids_issued: u64,
1719 loc_cids: FxHashMap<u64, ConnectionId>,
1720 addresses: FourTuple,
1725 side: Side,
1726 reset_token: Option<(SocketAddr, ResetToken)>,
1729 peer_id: Option<PeerId>,
1731}
1732
1733#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1735pub struct ConnectionHandle(pub usize);
1736
1737impl From<ConnectionHandle> for usize {
1738 fn from(x: ConnectionHandle) -> Self {
1739 x.0
1740 }
1741}
1742
1743impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1744 type Output = ConnectionMeta;
1745 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1746 &self[ch.0]
1747 }
1748}
1749
1750impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1751 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1752 &mut self[ch.0]
1753 }
1754}
1755
1756pub enum DatagramEvent {
1758 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1760 NewConnection(Incoming),
1762 Response(Transmit),
1764}
1765
1766pub struct Incoming {
1768 received_at: Instant,
1769 addresses: FourTuple,
1770 ecn: Option<EcnCodepoint>,
1771 packet: InitialPacket,
1772 rest: Option<BytesMut>,
1773 crypto: Keys,
1774 token: IncomingToken,
1775 incoming_idx: usize,
1776 improper_drop_warner: IncomingImproperDropWarner,
1777}
1778
1779impl Incoming {
1780 pub fn local_ip(&self) -> Option<IpAddr> {
1784 self.addresses.local_ip
1785 }
1786
1787 pub fn remote_address(&self) -> SocketAddr {
1789 self.addresses.remote
1790 }
1791
1792 pub fn remote_address_validated(&self) -> bool {
1800 self.token.validated
1801 }
1802
1803 pub fn may_retry(&self) -> bool {
1808 self.token.retry_src_cid.is_none()
1809 }
1810
1811 pub fn orig_dst_cid(&self) -> &ConnectionId {
1813 &self.token.orig_dst_cid
1814 }
1815}
1816
1817impl fmt::Debug for Incoming {
1818 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1819 f.debug_struct("Incoming")
1820 .field("addresses", &self.addresses)
1821 .field("ecn", &self.ecn)
1822 .field("token", &self.token)
1825 .field("incoming_idx", &self.incoming_idx)
1826 .finish_non_exhaustive()
1828 }
1829}
1830
1831struct IncomingImproperDropWarner;
1832
1833impl IncomingImproperDropWarner {
1834 fn dismiss(self) {
1835 mem::forget(self);
1836 }
1837}
1838
1839impl Drop for IncomingImproperDropWarner {
1840 fn drop(&mut self) {
1841 warn!(
1842 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1843 (may cause memory leak and eventual inability to accept new connections)"
1844 );
1845 }
1846}
1847
1848#[derive(Debug, Error, Clone, PartialEq, Eq)]
1852pub enum ConnectError {
1853 #[error("endpoint stopping")]
1857 EndpointStopping,
1858 #[error("CIDs exhausted")]
1862 CidsExhausted,
1863 #[error("invalid server name: {0}")]
1865 InvalidServerName(String),
1866 #[error("invalid remote address: {0}")]
1870 InvalidRemoteAddress(SocketAddr),
1871 #[error("no default client config")]
1875 NoDefaultClientConfig,
1876 #[error("unsupported QUIC version")]
1878 UnsupportedVersion,
1879}
1880
1881#[derive(Debug)]
1883pub struct AcceptError {
1884 pub cause: ConnectionError,
1886 pub response: Option<Transmit>,
1888}
1889
1890#[derive(Debug, Error)]
1892#[error("retry() with validated Incoming")]
1893pub struct RetryError(Box<Incoming>);
1894
1895impl RetryError {
1896 pub fn into_incoming(self) -> Incoming {
1898 *self.0
1899 }
1900}
1901
1902#[derive(Default, Debug)]
1907struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1908
1909impl ResetTokenTable {
1910 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1911 self.0
1912 .entry(remote)
1913 .or_default()
1914 .insert(token, ch)
1915 .is_some()
1916 }
1917
1918 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1919 use std::collections::hash_map::Entry;
1920 match self.0.entry(remote) {
1921 Entry::Vacant(_) => {}
1922 Entry::Occupied(mut e) => {
1923 e.get_mut().remove(&token);
1924 if e.get().is_empty() {
1925 e.remove_entry();
1926 }
1927 }
1928 }
1929 }
1930
1931 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1932 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1933 self.0.get(&remote)?.get(&token)
1934 }
1935}
1936
1937#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1942struct FourTuple {
1943 remote: SocketAddr,
1944 local_ip: Option<IpAddr>,
1946}