1use std::{
2 collections::{HashMap, hash_map, VecDeque},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use indexmap::IndexMap;
11
12use bytes::{BufMut, Bytes, BytesMut};
13use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
14use rustc_hash::FxHashMap;
15use slab::Slab;
16use thiserror::Error;
17use tracing::{debug, error, trace, warn};
18
19use crate::{
20 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
21 Side, Transmit, TransportConfig, TransportError,
22 cid_generator::ConnectionIdGenerator,
23 coding::BufMutExt,
24 config::{ClientConfig, EndpointConfig, ServerConfig},
25 connection::{Connection, ConnectionError, SideArgs},
26 crypto::{self, Keys, UnsupportedVersion},
27 frame,
28 nat_traversal_api::PeerId,
29 packet::{
30 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
31 PacketNumber, PartialDecode, ProtectedInitialHeader,
32 },
33 shared::{
34 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
35 EndpointEvent, EndpointEventInner, IssuedCid,
36 },
37 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
38 transport_parameters::{PreferredAddress, TransportParameters},
39};
40
41#[derive(Debug, Clone)]
43struct RelayQueueItem {
44 target_peer_id: PeerId,
46 frame: frame::PunchMeNow,
48 created_at: Instant,
50 attempts: u32,
52 last_attempt: Option<Instant>,
54}
55
56#[derive(Debug)]
58struct RelayQueue {
59 pending: IndexMap<u64, RelayQueueItem>,
61 next_seq: u64,
63 max_queue_size: usize,
65 request_timeout: Duration,
67 max_retries: u32,
69 retry_interval: Duration,
71 rate_limiter: HashMap<PeerId, VecDeque<Instant>>,
73 max_relays_per_peer: usize,
75 rate_limit_window: Duration,
77}
78
79#[derive(Debug, Default)]
81pub struct RelayStats {
82 requests_received: u64,
84 requests_relayed: u64,
86 requests_failed: u64,
88 requests_dropped: u64,
90 requests_timed_out: u64,
92 requests_rate_limited: u64,
94 current_queue_size: usize,
96}
97
98impl RelayQueue {
99 fn new() -> Self {
101 Self {
102 pending: IndexMap::new(),
103 next_seq: 0,
104 max_queue_size: 1000, request_timeout: Duration::from_secs(30), max_retries: 3,
107 retry_interval: Duration::from_millis(500), rate_limiter: HashMap::new(),
109 max_relays_per_peer: 10, rate_limit_window: Duration::from_secs(60), }
112 }
113
114 fn enqueue(&mut self, target_peer_id: PeerId, frame: frame::PunchMeNow, now: Instant) -> bool {
116 if self.pending.len() >= self.max_queue_size {
118 warn!("Relay queue full, dropping request for peer {:?}", target_peer_id);
119 return false;
120 }
121
122 if !self.check_rate_limit(target_peer_id, now) {
124 warn!("Rate limit exceeded for peer {:?}, dropping relay request", target_peer_id);
125 return false;
126 }
127
128 let item = RelayQueueItem {
129 target_peer_id,
130 frame,
131 created_at: now,
132 attempts: 0,
133 last_attempt: None,
134 };
135
136 let seq = self.next_seq;
137 self.next_seq += 1;
138 self.pending.insert(seq, item);
139
140 self.record_relay_request(target_peer_id, now);
142
143 trace!("Queued relay request for peer {:?}, queue size: {}", target_peer_id, self.pending.len());
144 true
145 }
146
147 fn check_rate_limit(&mut self, peer_id: PeerId, now: Instant) -> bool {
149 self.cleanup_rate_limiter(now);
151
152 if let Some(requests) = self.rate_limiter.get(&peer_id) {
154 requests.len() < self.max_relays_per_peer
155 } else {
156 true }
158 }
159
160 fn record_relay_request(&mut self, peer_id: PeerId, now: Instant) {
162 self.rate_limiter.entry(peer_id).or_insert_with(VecDeque::new).push_back(now);
163 }
164
165 fn cleanup_rate_limiter(&mut self, now: Instant) {
167 self.rate_limiter.retain(|_, requests| {
168 requests.retain(|&request_time| now.saturating_duration_since(request_time) <= self.rate_limit_window);
169 !requests.is_empty()
170 });
171 }
172
173 fn next_ready(&mut self, now: Instant) -> Option<RelayQueueItem> {
175 let mut expired_keys = Vec::new();
177 let mut ready_key = None;
178
179 for (seq, item) in &self.pending {
180 if now.saturating_duration_since(item.created_at) > self.request_timeout {
182 expired_keys.push(*seq);
183 continue;
184 }
185
186 if item.attempts == 0 ||
188 item.last_attempt.map_or(true, |last|
189 now.saturating_duration_since(last) >= self.retry_interval) {
190 ready_key = Some(*seq);
191 break;
192 }
193 }
194
195 for key in expired_keys {
197 if let Some(expired) = self.pending.shift_remove(&key) {
198 debug!("Relay request for peer {:?} timed out after {:?}",
199 expired.target_peer_id,
200 now.saturating_duration_since(expired.created_at));
201 }
202 }
203
204 if let Some(key) = ready_key {
206 if let Some(mut item) = self.pending.shift_remove(&key) {
207 item.attempts += 1;
208 item.last_attempt = Some(now);
209 return Some(item);
210 }
211 }
212
213 None
214 }
215
216 fn requeue_failed(&mut self, item: RelayQueueItem) {
218 if item.attempts < self.max_retries {
219 trace!("Requeuing failed relay request for peer {:?}, attempt {}/{}",
220 item.target_peer_id, item.attempts, self.max_retries);
221 let seq = self.next_seq;
222 self.next_seq += 1;
223 self.pending.insert(seq, item);
224 } else {
225 debug!("Dropping relay request for peer {:?} after {} failed attempts",
226 item.target_peer_id, item.attempts);
227 }
228 }
229
230 fn cleanup_expired(&mut self, now: Instant) -> usize {
232 let initial_len = self.pending.len();
233
234 let expired_keys: Vec<u64> = self.pending.iter()
236 .filter_map(|(seq, item)| {
237 if now.saturating_duration_since(item.created_at) > self.request_timeout {
238 Some(*seq)
239 } else {
240 None
241 }
242 })
243 .collect();
244
245 for key in expired_keys {
247 if let Some(expired) = self.pending.shift_remove(&key) {
248 debug!("Removing expired relay request for peer {:?}", expired.target_peer_id);
249 }
250 }
251
252 initial_len - self.pending.len()
253 }
254
255 fn len(&self) -> usize {
257 self.pending.len()
258 }
259
260}
261
262pub struct Endpoint {
267 rng: StdRng,
268 index: ConnectionIndex,
269 connections: Slab<ConnectionMeta>,
270 local_cid_generator: Box<dyn ConnectionIdGenerator>,
271 config: Arc<EndpointConfig>,
272 server_config: Option<Arc<ServerConfig>>,
273 allow_mtud: bool,
275 last_stateless_reset: Option<Instant>,
277 incoming_buffers: Slab<IncomingBuffer>,
279 all_incoming_buffers_total_bytes: u64,
280 peer_connections: HashMap<PeerId, ConnectionHandle>,
282 relay_queue: RelayQueue,
284 relay_stats: RelayStats,
286}
287
288impl Endpoint {
289 pub fn new(
300 config: Arc<EndpointConfig>,
301 server_config: Option<Arc<ServerConfig>>,
302 allow_mtud: bool,
303 rng_seed: Option<[u8; 32]>,
304 ) -> Self {
305 let rng_seed = rng_seed.or(config.rng_seed);
306 Self {
307 rng: rng_seed.map_or(StdRng::from_entropy(), StdRng::from_seed),
308 index: ConnectionIndex::default(),
309 connections: Slab::new(),
310 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
311 config,
312 server_config,
313 allow_mtud,
314 last_stateless_reset: None,
315 incoming_buffers: Slab::new(),
316 all_incoming_buffers_total_bytes: 0,
317 peer_connections: HashMap::new(),
318 relay_queue: RelayQueue::new(),
319 relay_stats: RelayStats::default(),
320 }
321 }
322
323 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
325 self.server_config = server_config;
326 }
327
328 pub fn register_peer(&mut self, peer_id: PeerId, connection_handle: ConnectionHandle) {
330 self.peer_connections.insert(peer_id, connection_handle);
331 trace!("Registered peer {:?} with connection {:?}", peer_id, connection_handle);
332 }
333
334 pub fn unregister_peer(&mut self, peer_id: &PeerId) {
336 if let Some(handle) = self.peer_connections.remove(peer_id) {
337 trace!("Unregistered peer {:?} from connection {:?}", peer_id, handle);
338 }
339 }
340
341 pub fn lookup_peer_connection(&self, peer_id: &PeerId) -> Option<ConnectionHandle> {
343 self.peer_connections.get(peer_id).copied()
344 }
345
346 pub(crate) fn queue_frame_for_peer(&mut self, peer_id: &PeerId, frame: frame::PunchMeNow) -> bool {
348 self.relay_stats.requests_received += 1;
349
350 if let Some(ch) = self.lookup_peer_connection(peer_id) {
351 if self.relay_frame_to_connection(ch, frame.clone()) {
353 self.relay_stats.requests_relayed += 1;
354 trace!("Immediately relayed frame to peer {:?} via connection {:?}", peer_id, ch);
355 return true;
356 }
357 }
358
359 let now = Instant::now();
361 if self.relay_queue.enqueue(*peer_id, frame, now) {
362 self.relay_stats.current_queue_size = self.relay_queue.len();
363 trace!("Queued relay request for peer {:?}", peer_id);
364 true
365 } else {
366 if !self.relay_queue.check_rate_limit(*peer_id, now) {
368 self.relay_stats.requests_rate_limited += 1;
369 } else {
370 self.relay_stats.requests_dropped += 1;
371 }
372 false
373 }
374 }
375
376 fn relay_frame_to_connection(&mut self, ch: ConnectionHandle, _frame: frame::PunchMeNow) -> bool {
378 trace!("Would relay frame to connection {:?}", ch);
384 true
385 }
386
387 pub fn set_connection_peer_id(&mut self, connection_handle: ConnectionHandle, peer_id: PeerId) {
389 if let Some(connection) = self.connections.get_mut(connection_handle.0) {
390 connection.peer_id = Some(peer_id);
391 self.register_peer(peer_id, connection_handle);
392
393 self.process_queued_relays_for_peer(peer_id);
395 }
396 }
397
398 fn process_queued_relays_for_peer(&mut self, peer_id: PeerId) {
400 let _now = Instant::now();
401 let mut processed = 0;
402
403 let mut items_to_process = Vec::new();
405 let mut keys_to_remove = Vec::new();
406
407 for (seq, item) in &self.relay_queue.pending {
409 if item.target_peer_id == peer_id {
410 items_to_process.push(item.clone());
411 keys_to_remove.push(*seq);
412 }
413 }
414
415 for key in keys_to_remove {
417 self.relay_queue.pending.shift_remove(&key);
418 }
419
420 for item in items_to_process {
422 if let Some(ch) = self.lookup_peer_connection(&peer_id) {
423 if self.relay_frame_to_connection(ch, item.frame.clone()) {
424 self.relay_stats.requests_relayed += 1;
425 processed += 1;
426 trace!("Processed queued relay for peer {:?}", peer_id);
427 } else {
428 self.relay_queue.requeue_failed(item);
430 self.relay_stats.requests_failed += 1;
431 }
432 }
433 }
434
435 self.relay_stats.current_queue_size = self.relay_queue.len();
436
437 if processed > 0 {
438 debug!("Processed {} queued relay requests for peer {:?}", processed, peer_id);
439 }
440 }
441
442 pub fn process_relay_queue(&mut self) {
444 let now = Instant::now();
445 let mut processed = 0;
446 let mut failed = 0;
447
448 while let Some(item) = self.relay_queue.next_ready(now) {
450 if let Some(ch) = self.lookup_peer_connection(&item.target_peer_id) {
451 if self.relay_frame_to_connection(ch, item.frame.clone()) {
452 self.relay_stats.requests_relayed += 1;
453 processed += 1;
454 trace!("Successfully relayed frame to peer {:?}", item.target_peer_id);
455 } else {
456 self.relay_queue.requeue_failed(item);
458 self.relay_stats.requests_failed += 1;
459 failed += 1;
460 }
461 } else {
462 self.relay_queue.requeue_failed(item);
464 failed += 1;
465 }
466 }
467
468 let expired = self.relay_queue.cleanup_expired(now);
470 if expired > 0 {
471 self.relay_stats.requests_timed_out += expired as u64;
472 debug!("Cleaned up {} expired relay requests", expired);
473 }
474
475 self.relay_stats.current_queue_size = self.relay_queue.len();
476
477 if processed > 0 || failed > 0 {
478 trace!("Relay queue processing: {} processed, {} failed, {} in queue",
479 processed, failed, self.relay_queue.len());
480 }
481 }
482
483 pub fn relay_stats(&self) -> &RelayStats {
485 &self.relay_stats
486 }
487
488 pub fn relay_queue_len(&self) -> usize {
490 self.relay_queue.len()
491 }
492
493 pub fn handle_event(
497 &mut self,
498 ch: ConnectionHandle,
499 event: EndpointEvent,
500 ) -> Option<ConnectionEvent> {
501 use EndpointEventInner::*;
502 match event.0 {
503 EndpointEventInner::NeedIdentifiers(now, n) => {
504 return Some(self.send_new_identifiers(now, ch, n));
505 }
506 ResetToken(remote, token) => {
507 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
508 self.index.connection_reset_tokens.remove(old.0, old.1);
509 }
510 if self.index.connection_reset_tokens.insert(remote, token, ch) {
511 warn!("duplicate reset token");
512 }
513 }
514 RetireConnectionId(now, seq, allow_more_cids) => {
515 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
516 trace!("peer retired CID {}: {}", seq, cid);
517 self.index.retire(cid);
518 if allow_more_cids {
519 return Some(self.send_new_identifiers(now, ch, 1));
520 }
521 }
522 }
523 RelayPunchMeNow(target_peer_id, punch_me_now) => {
524 let peer_id = PeerId(target_peer_id);
526 if self.queue_frame_for_peer(&peer_id, punch_me_now) {
527 trace!("Successfully queued PunchMeNow frame for relay to peer {:?}", peer_id);
528 } else {
529 warn!("Failed to queue PunchMeNow relay for peer {:?}", peer_id);
530 }
531 }
532 SendAddressFrame(add_address_frame) => {
533 trace!("Sending ADD_ADDRESS frame: seq={}, addr={}, priority={}",
535 add_address_frame.sequence, add_address_frame.address, add_address_frame.priority);
536
537 debug!("ADD_ADDRESS frame ready for transmission: {:?}", add_address_frame);
540 }
541 NatCandidateValidated { address, challenge } => {
542 trace!("NAT candidate validation succeeded for {} with challenge {:016x}", address, challenge);
544
545 debug!("NAT candidate {} validated successfully", address);
549 }
550 Drained => {
551 if let Some(conn) = self.connections.try_remove(ch.0) {
552 self.index.remove(&conn);
553 if let Some(peer_id) = conn.peer_id {
555 self.peer_connections.remove(&peer_id);
556 trace!("Cleaned up peer connection mapping for {:?}", peer_id);
557 }
558 } else {
559 error!(id = ch.0, "unknown connection drained");
563 }
564 }
565 }
566 None
567 }
568
569 pub fn handle(
571 &mut self,
572 now: Instant,
573 remote: SocketAddr,
574 local_ip: Option<IpAddr>,
575 ecn: Option<EcnCodepoint>,
576 data: BytesMut,
577 buf: &mut Vec<u8>,
578 ) -> Option<DatagramEvent> {
579 let datagram_len = data.len();
581 let event = match PartialDecode::new(
582 data,
583 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
584 &self.config.supported_versions,
585 self.config.grease_quic_bit,
586 ) {
587 Ok((first_decode, remaining)) => DatagramConnectionEvent {
588 now,
589 remote,
590 ecn,
591 first_decode,
592 remaining,
593 },
594 Err(PacketDecodeError::UnsupportedVersion {
595 src_cid,
596 dst_cid,
597 version,
598 }) => {
599 if self.server_config.is_none() {
600 debug!("dropping packet with unsupported version");
601 return None;
602 }
603 trace!("sending version negotiation");
604 Header::VersionNegotiate {
606 random: self.rng.gen::<u8>() | 0x40,
607 src_cid: dst_cid,
608 dst_cid: src_cid,
609 }
610 .encode(buf);
611 buf.write::<u32>(match version {
613 0x0a1a_2a3a => 0x0a1a_2a4a,
614 _ => 0x0a1a_2a3a,
615 });
616 for &version in &self.config.supported_versions {
617 buf.write(version);
618 }
619 return Some(DatagramEvent::Response(Transmit {
620 destination: remote,
621 ecn: None,
622 size: buf.len(),
623 segment_size: None,
624 src_ip: local_ip,
625 }));
626 }
627 Err(e) => {
628 trace!("malformed header: {}", e);
629 return None;
630 }
631 };
632
633 let addresses = FourTuple { remote, local_ip };
634 let dst_cid = event.first_decode.dst_cid();
635
636 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
637 match route_to {
639 RouteDatagramTo::Incoming(incoming_idx) => {
640 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
641 let config = &self.server_config.as_ref().unwrap();
642
643 if incoming_buffer
644 .total_bytes
645 .checked_add(datagram_len as u64)
646 .is_some_and(|n| n <= config.incoming_buffer_size)
647 && self
648 .all_incoming_buffers_total_bytes
649 .checked_add(datagram_len as u64)
650 .is_some_and(|n| n <= config.incoming_buffer_size_total)
651 {
652 incoming_buffer.datagrams.push(event);
653 incoming_buffer.total_bytes += datagram_len as u64;
654 self.all_incoming_buffers_total_bytes += datagram_len as u64;
655 }
656
657 None
658 }
659 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
660 ch,
661 ConnectionEvent(ConnectionEventInner::Datagram(event)),
662 )),
663 }
664 } else if event.first_decode.initial_header().is_some() {
665 self.handle_first_packet(datagram_len, event, addresses, buf)
668 } else if event.first_decode.has_long_header() {
669 debug!(
670 "ignoring non-initial packet for unknown connection {}",
671 dst_cid
672 );
673 None
674 } else if !event.first_decode.is_initial()
675 && self.local_cid_generator.validate(dst_cid).is_err()
676 {
677 debug!("dropping packet with invalid CID");
681 None
682 } else if dst_cid.is_empty() {
683 trace!("dropping unrecognized short packet without ID");
684 None
685 } else {
686 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
687 .map(DatagramEvent::Response)
688 }
689 }
690
691 fn stateless_reset(
692 &mut self,
693 now: Instant,
694 inciting_dgram_len: usize,
695 addresses: FourTuple,
696 dst_cid: ConnectionId,
697 buf: &mut Vec<u8>,
698 ) -> Option<Transmit> {
699 if self
700 .last_stateless_reset
701 .is_some_and(|last| last + self.config.min_reset_interval > now)
702 {
703 debug!("ignoring unexpected packet within minimum stateless reset interval");
704 return None;
705 }
706
707 const MIN_PADDING_LEN: usize = 5;
709
710 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
713 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
714 _ => {
715 debug!(
716 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
717 inciting_dgram_len
718 );
719 return None;
720 }
721 };
722
723 debug!(
724 "sending stateless reset for {} to {}",
725 dst_cid, addresses.remote
726 );
727 self.last_stateless_reset = Some(now);
728 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
730 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
731 max_padding_len
732 } else {
733 self.rng
734 .gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
735 };
736 buf.reserve(padding_len + RESET_TOKEN_SIZE);
737 buf.resize(padding_len, 0);
738 self.rng.fill_bytes(&mut buf[0..padding_len]);
739 buf[0] = 0b0100_0000 | (buf[0] >> 2);
740 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
741
742 debug_assert!(buf.len() < inciting_dgram_len);
743
744 Some(Transmit {
745 destination: addresses.remote,
746 ecn: None,
747 size: buf.len(),
748 segment_size: None,
749 src_ip: addresses.local_ip,
750 })
751 }
752
753 pub fn connect(
755 &mut self,
756 now: Instant,
757 config: ClientConfig,
758 remote: SocketAddr,
759 server_name: &str,
760 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
761 if self.cids_exhausted() {
762 return Err(ConnectError::CidsExhausted);
763 }
764 if remote.port() == 0 || remote.ip().is_unspecified() {
765 return Err(ConnectError::InvalidRemoteAddress(remote));
766 }
767 if !self.config.supported_versions.contains(&config.version) {
768 return Err(ConnectError::UnsupportedVersion);
769 }
770
771 let remote_id = (config.initial_dst_cid_provider)();
772 trace!(initial_dcid = %remote_id);
773
774 let ch = ConnectionHandle(self.connections.vacant_key());
775 let loc_cid = self.new_cid(ch);
776 let params = TransportParameters::new(
777 &config.transport,
778 &self.config,
779 self.local_cid_generator.as_ref(),
780 loc_cid,
781 None,
782 &mut self.rng,
783 );
784 let tls = config
785 .crypto
786 .start_session(config.version, server_name, ¶ms)?;
787
788 let conn = self.add_connection(
789 ch,
790 config.version,
791 remote_id,
792 loc_cid,
793 remote_id,
794 FourTuple {
795 remote,
796 local_ip: None,
797 },
798 now,
799 tls,
800 config.transport,
801 SideArgs::Client {
802 token_store: config.token_store,
803 server_name: server_name.into(),
804 },
805 );
806 Ok((ch, conn))
807 }
808
809 fn send_new_identifiers(
810 &mut self,
811 now: Instant,
812 ch: ConnectionHandle,
813 num: u64,
814 ) -> ConnectionEvent {
815 let mut ids = vec![];
816 for _ in 0..num {
817 let id = self.new_cid(ch);
818 let meta = &mut self.connections[ch];
819 let sequence = meta.cids_issued;
820 meta.cids_issued += 1;
821 meta.loc_cids.insert(sequence, id);
822 ids.push(IssuedCid {
823 sequence,
824 id,
825 reset_token: ResetToken::new(&*self.config.reset_key, id),
826 });
827 }
828 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
829 }
830
831 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
833 loop {
834 let cid = self.local_cid_generator.generate_cid();
835 if cid.is_empty() {
836 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
838 return cid;
839 }
840 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
841 e.insert(ch);
842 break cid;
843 }
844 }
845 }
846
847 fn handle_first_packet(
848 &mut self,
849 datagram_len: usize,
850 event: DatagramConnectionEvent,
851 addresses: FourTuple,
852 buf: &mut Vec<u8>,
853 ) -> Option<DatagramEvent> {
854 let dst_cid = event.first_decode.dst_cid();
855 let header = event.first_decode.initial_header().unwrap();
856
857 let Some(server_config) = &self.server_config else {
858 debug!("packet for unrecognized connection {}", dst_cid);
859 return self
860 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
861 .map(DatagramEvent::Response);
862 };
863
864 if datagram_len < MIN_INITIAL_SIZE as usize {
865 debug!("ignoring short initial for connection {}", dst_cid);
866 return None;
867 }
868
869 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
870 Ok(keys) => keys,
871 Err(UnsupportedVersion) => {
872 debug!(
875 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
876 header.version
877 );
878 return None;
879 }
880 };
881
882 if let Err(reason) = self.early_validate_first_packet(header) {
883 return Some(DatagramEvent::Response(self.initial_close(
884 header.version,
885 addresses,
886 &crypto,
887 &header.src_cid,
888 reason,
889 buf,
890 )));
891 }
892
893 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
894 Ok(packet) => packet,
895 Err(e) => {
896 trace!("unable to decode initial packet: {}", e);
897 return None;
898 }
899 };
900
901 if !packet.reserved_bits_valid() {
902 debug!("dropping connection attempt with invalid reserved bits");
903 return None;
904 }
905
906 let Header::Initial(header) = packet.header else {
907 panic!("non-initial packet in handle_first_packet()");
908 };
909
910 let server_config = self.server_config.as_ref().unwrap().clone();
911
912 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
913 Ok(token) => token,
914 Err(InvalidRetryTokenError) => {
915 debug!("rejecting invalid retry token");
916 return Some(DatagramEvent::Response(self.initial_close(
917 header.version,
918 addresses,
919 &crypto,
920 &header.src_cid,
921 TransportError::INVALID_TOKEN(""),
922 buf,
923 )));
924 }
925 };
926
927 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
928 self.index
929 .insert_initial_incoming(header.dst_cid, incoming_idx);
930
931 Some(DatagramEvent::NewConnection(Incoming {
932 received_at: event.now,
933 addresses,
934 ecn: event.ecn,
935 packet: InitialPacket {
936 header,
937 header_data: packet.header_data,
938 payload: packet.payload,
939 },
940 rest: event.remaining,
941 crypto,
942 token,
943 incoming_idx,
944 improper_drop_warner: IncomingImproperDropWarner,
945 }))
946 }
947
948 #[allow(clippy::result_large_err)]
951 pub fn accept(
952 &mut self,
953 mut incoming: Incoming,
954 now: Instant,
955 buf: &mut Vec<u8>,
956 server_config: Option<Arc<ServerConfig>>,
957 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
958 let remote_address_validated = incoming.remote_address_validated();
959 incoming.improper_drop_warner.dismiss();
960 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
961 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
962
963 let packet_number = incoming.packet.header.number.expand(0);
964 let InitialHeader {
965 src_cid,
966 dst_cid,
967 version,
968 ..
969 } = incoming.packet.header;
970 let server_config =
971 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
972
973 if server_config
974 .transport
975 .max_idle_timeout
976 .is_some_and(|timeout| {
977 incoming.received_at + Duration::from_millis(timeout.into()) <= now
978 })
979 {
980 debug!("abandoning accept of stale initial");
981 self.index.remove_initial(dst_cid);
982 return Err(AcceptError {
983 cause: ConnectionError::TimedOut,
984 response: None,
985 });
986 }
987
988 if self.cids_exhausted() {
989 debug!("refusing connection");
990 self.index.remove_initial(dst_cid);
991 return Err(AcceptError {
992 cause: ConnectionError::CidsExhausted,
993 response: Some(self.initial_close(
994 version,
995 incoming.addresses,
996 &incoming.crypto,
997 &src_cid,
998 TransportError::CONNECTION_REFUSED(""),
999 buf,
1000 )),
1001 });
1002 }
1003
1004 if incoming
1005 .crypto
1006 .packet
1007 .remote
1008 .decrypt(
1009 packet_number,
1010 &incoming.packet.header_data,
1011 &mut incoming.packet.payload,
1012 )
1013 .is_err()
1014 {
1015 debug!(packet_number, "failed to authenticate initial packet");
1016 self.index.remove_initial(dst_cid);
1017 return Err(AcceptError {
1018 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
1019 response: None,
1020 });
1021 };
1022
1023 let ch = ConnectionHandle(self.connections.vacant_key());
1024 let loc_cid = self.new_cid(ch);
1025 let mut params = TransportParameters::new(
1026 &server_config.transport,
1027 &self.config,
1028 self.local_cid_generator.as_ref(),
1029 loc_cid,
1030 Some(&server_config),
1031 &mut self.rng,
1032 );
1033 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
1034 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
1035 params.retry_src_cid = incoming.token.retry_src_cid;
1036 let mut pref_addr_cid = None;
1037 if server_config.has_preferred_address() {
1038 let cid = self.new_cid(ch);
1039 pref_addr_cid = Some(cid);
1040 params.preferred_address = Some(PreferredAddress {
1041 address_v4: server_config.preferred_address_v4,
1042 address_v6: server_config.preferred_address_v6,
1043 connection_id: cid,
1044 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
1045 });
1046 }
1047
1048 let tls = server_config.crypto.clone().start_session(version, ¶ms);
1049 let transport_config = server_config.transport.clone();
1050 let mut conn = self.add_connection(
1051 ch,
1052 version,
1053 dst_cid,
1054 loc_cid,
1055 src_cid,
1056 incoming.addresses,
1057 incoming.received_at,
1058 tls,
1059 transport_config,
1060 SideArgs::Server {
1061 server_config,
1062 pref_addr_cid,
1063 path_validated: remote_address_validated,
1064 },
1065 );
1066 self.index.insert_initial(dst_cid, ch);
1067
1068 match conn.handle_first_packet(
1069 incoming.received_at,
1070 incoming.addresses.remote,
1071 incoming.ecn,
1072 packet_number,
1073 incoming.packet,
1074 incoming.rest,
1075 ) {
1076 Ok(()) => {
1077 trace!(id = ch.0, icid = %dst_cid, "new connection");
1078
1079 for event in incoming_buffer.datagrams {
1080 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
1081 }
1082
1083 Ok((ch, conn))
1084 }
1085 Err(e) => {
1086 debug!("handshake failed: {}", e);
1087 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
1088 let response = match e {
1089 ConnectionError::TransportError(ref e) => Some(self.initial_close(
1090 version,
1091 incoming.addresses,
1092 &incoming.crypto,
1093 &src_cid,
1094 e.clone(),
1095 buf,
1096 )),
1097 _ => None,
1098 };
1099 Err(AcceptError { cause: e, response })
1100 }
1101 }
1102 }
1103
1104 fn early_validate_first_packet(
1106 &mut self,
1107 header: &ProtectedInitialHeader,
1108 ) -> Result<(), TransportError> {
1109 let config = &self.server_config.as_ref().unwrap();
1110 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
1111 return Err(TransportError::CONNECTION_REFUSED(""));
1112 }
1113
1114 if header.dst_cid.len() < 8
1119 && (header.token_pos.is_empty()
1120 || header.dst_cid.len() != self.local_cid_generator.cid_len())
1121 {
1122 debug!(
1123 "rejecting connection due to invalid DCID length {}",
1124 header.dst_cid.len()
1125 );
1126 return Err(TransportError::PROTOCOL_VIOLATION(
1127 "invalid destination CID length",
1128 ));
1129 }
1130
1131 Ok(())
1132 }
1133
1134 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
1136 self.clean_up_incoming(&incoming);
1137 incoming.improper_drop_warner.dismiss();
1138
1139 self.initial_close(
1140 incoming.packet.header.version,
1141 incoming.addresses,
1142 &incoming.crypto,
1143 &incoming.packet.header.src_cid,
1144 TransportError::CONNECTION_REFUSED(""),
1145 buf,
1146 )
1147 }
1148
1149 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
1153 if !incoming.may_retry() {
1154 return Err(RetryError(Box::new(incoming)));
1155 }
1156
1157 self.clean_up_incoming(&incoming);
1158 incoming.improper_drop_warner.dismiss();
1159
1160 let server_config = self.server_config.as_ref().unwrap();
1161
1162 let loc_cid = self.local_cid_generator.generate_cid();
1169
1170 let payload = TokenPayload::Retry {
1171 address: incoming.addresses.remote,
1172 orig_dst_cid: incoming.packet.header.dst_cid,
1173 issued: server_config.time_source.now(),
1174 };
1175 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
1176
1177 let header = Header::Retry {
1178 src_cid: loc_cid,
1179 dst_cid: incoming.packet.header.src_cid,
1180 version: incoming.packet.header.version,
1181 };
1182
1183 let encode = header.encode(buf);
1184 buf.put_slice(&token);
1185 buf.extend_from_slice(&server_config.crypto.retry_tag(
1186 incoming.packet.header.version,
1187 &incoming.packet.header.dst_cid,
1188 buf,
1189 ));
1190 encode.finish(buf, &*incoming.crypto.header.local, None);
1191
1192 Ok(Transmit {
1193 destination: incoming.addresses.remote,
1194 ecn: None,
1195 size: buf.len(),
1196 segment_size: None,
1197 src_ip: incoming.addresses.local_ip,
1198 })
1199 }
1200
1201 pub fn ignore(&mut self, incoming: Incoming) {
1206 self.clean_up_incoming(&incoming);
1207 incoming.improper_drop_warner.dismiss();
1208 }
1209
1210 fn clean_up_incoming(&mut self, incoming: &Incoming) {
1212 self.index.remove_initial(incoming.packet.header.dst_cid);
1213 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
1214 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
1215 }
1216
1217 fn add_connection(
1218 &mut self,
1219 ch: ConnectionHandle,
1220 version: u32,
1221 init_cid: ConnectionId,
1222 loc_cid: ConnectionId,
1223 rem_cid: ConnectionId,
1224 addresses: FourTuple,
1225 now: Instant,
1226 tls: Box<dyn crypto::Session>,
1227 transport_config: Arc<TransportConfig>,
1228 side_args: SideArgs,
1229 ) -> Connection {
1230 let mut rng_seed = [0; 32];
1231 self.rng.fill_bytes(&mut rng_seed);
1232 let side = side_args.side();
1233 let pref_addr_cid = side_args.pref_addr_cid();
1234 let conn = Connection::new(
1235 self.config.clone(),
1236 transport_config,
1237 init_cid,
1238 loc_cid,
1239 rem_cid,
1240 addresses.remote,
1241 addresses.local_ip,
1242 tls,
1243 self.local_cid_generator.as_ref(),
1244 now,
1245 version,
1246 self.allow_mtud,
1247 rng_seed,
1248 side_args,
1249 );
1250
1251 let mut cids_issued = 0;
1252 let mut loc_cids = FxHashMap::default();
1253
1254 loc_cids.insert(cids_issued, loc_cid);
1255 cids_issued += 1;
1256
1257 if let Some(cid) = pref_addr_cid {
1258 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
1259 loc_cids.insert(cids_issued, cid);
1260 cids_issued += 1;
1261 }
1262
1263 let id = self.connections.insert(ConnectionMeta {
1264 init_cid,
1265 cids_issued,
1266 loc_cids,
1267 addresses,
1268 side,
1269 reset_token: None,
1270 peer_id: None,
1271 });
1272 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
1273
1274 self.index.insert_conn(addresses, loc_cid, ch, side);
1275
1276 conn
1277 }
1278
1279 fn initial_close(
1280 &mut self,
1281 version: u32,
1282 addresses: FourTuple,
1283 crypto: &Keys,
1284 remote_id: &ConnectionId,
1285 reason: TransportError,
1286 buf: &mut Vec<u8>,
1287 ) -> Transmit {
1288 let local_id = self.local_cid_generator.generate_cid();
1292 let number = PacketNumber::U8(0);
1293 let header = Header::Initial(InitialHeader {
1294 dst_cid: *remote_id,
1295 src_cid: local_id,
1296 number,
1297 token: Bytes::new(),
1298 version,
1299 });
1300
1301 let partial_encode = header.encode(buf);
1302 let max_len =
1303 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
1304 frame::Close::from(reason).encode(buf, max_len);
1305 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
1306 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
1307 Transmit {
1308 destination: addresses.remote,
1309 ecn: None,
1310 size: buf.len(),
1311 segment_size: None,
1312 src_ip: addresses.local_ip,
1313 }
1314 }
1315
1316 pub fn config(&self) -> &EndpointConfig {
1318 &self.config
1319 }
1320
1321 pub fn open_connections(&self) -> usize {
1323 self.connections.len()
1324 }
1325
1326 pub fn incoming_buffer_bytes(&self) -> u64 {
1329 self.all_incoming_buffers_total_bytes
1330 }
1331
1332 #[cfg(test)]
1333 pub(crate) fn known_connections(&self) -> usize {
1334 let x = self.connections.len();
1335 debug_assert_eq!(x, self.index.connection_ids_initial.len());
1336 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
1338 debug_assert!(x >= self.index.incoming_connection_remotes.len());
1340 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
1341 x
1342 }
1343
1344 #[cfg(test)]
1345 pub(crate) fn known_cids(&self) -> usize {
1346 self.index.connection_ids.len()
1347 }
1348
1349 fn cids_exhausted(&self) -> bool {
1354 self.local_cid_generator.cid_len() <= 4
1355 && self.local_cid_generator.cid_len() != 0
1356 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
1357 - self.index.connection_ids.len())
1358 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
1359 }
1360}
1361
1362impl fmt::Debug for Endpoint {
1363 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1364 fmt.debug_struct("Endpoint")
1365 .field("rng", &self.rng)
1366 .field("index", &self.index)
1367 .field("connections", &self.connections)
1368 .field("config", &self.config)
1369 .field("server_config", &self.server_config)
1370 .field("incoming_buffers.len", &self.incoming_buffers.len())
1372 .field(
1373 "all_incoming_buffers_total_bytes",
1374 &self.all_incoming_buffers_total_bytes,
1375 )
1376 .finish()
1377 }
1378}
1379
1380#[derive(Default)]
1382struct IncomingBuffer {
1383 datagrams: Vec<DatagramConnectionEvent>,
1384 total_bytes: u64,
1385}
1386
1387#[derive(Copy, Clone, Debug)]
1389enum RouteDatagramTo {
1390 Incoming(usize),
1391 Connection(ConnectionHandle),
1392}
1393
1394#[derive(Default, Debug)]
1396struct ConnectionIndex {
1397 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1403 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
1407 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1411 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1420 connection_reset_tokens: ResetTokenTable,
1425}
1426
1427impl ConnectionIndex {
1428 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1430 if dst_cid.is_empty() {
1431 return;
1432 }
1433 self.connection_ids_initial
1434 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1435 }
1436
1437 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1439 if dst_cid.is_empty() {
1440 return;
1441 }
1442 let removed = self.connection_ids_initial.remove(&dst_cid);
1443 debug_assert!(removed.is_some());
1444 }
1445
1446 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1448 if dst_cid.is_empty() {
1449 return;
1450 }
1451 self.connection_ids_initial
1452 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1453 }
1454
1455 fn insert_conn(
1458 &mut self,
1459 addresses: FourTuple,
1460 dst_cid: ConnectionId,
1461 connection: ConnectionHandle,
1462 side: Side,
1463 ) {
1464 match dst_cid.len() {
1465 0 => match side {
1466 Side::Server => {
1467 self.incoming_connection_remotes
1468 .insert(addresses, connection);
1469 }
1470 Side::Client => {
1471 self.outgoing_connection_remotes
1472 .insert(addresses.remote, connection);
1473 }
1474 },
1475 _ => {
1476 self.connection_ids.insert(dst_cid, connection);
1477 }
1478 }
1479 }
1480
1481 fn retire(&mut self, dst_cid: ConnectionId) {
1483 self.connection_ids.remove(&dst_cid);
1484 }
1485
1486 fn remove(&mut self, conn: &ConnectionMeta) {
1488 if conn.side.is_server() {
1489 self.remove_initial(conn.init_cid);
1490 }
1491 for cid in conn.loc_cids.values() {
1492 self.connection_ids.remove(cid);
1493 }
1494 self.incoming_connection_remotes.remove(&conn.addresses);
1495 self.outgoing_connection_remotes
1496 .remove(&conn.addresses.remote);
1497 if let Some((remote, token)) = conn.reset_token {
1498 self.connection_reset_tokens.remove(remote, token);
1499 }
1500 }
1501
1502 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1504 let dst_cid = datagram.dst_cid();
1505 let is_empty_cid = dst_cid.is_empty();
1506
1507 if !is_empty_cid {
1509 if let Some(&ch) = self.connection_ids.get(dst_cid) {
1510 return Some(RouteDatagramTo::Connection(ch));
1511 }
1512 }
1513
1514 if datagram.is_initial() || datagram.is_0rtt() {
1516 if let Some(&ch) = self.connection_ids_initial.get(dst_cid) {
1517 return Some(ch);
1518 }
1519 }
1520
1521 if is_empty_cid {
1523 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1525 return Some(RouteDatagramTo::Connection(ch));
1526 }
1527 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1528 return Some(RouteDatagramTo::Connection(ch));
1529 }
1530 }
1531
1532 let data = datagram.data();
1534 if data.len() < RESET_TOKEN_SIZE {
1535 return None;
1536 }
1537 self.connection_reset_tokens
1538 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1539 .cloned()
1540 .map(RouteDatagramTo::Connection)
1541 }
1542}
1543
1544#[derive(Debug)]
1545pub(crate) struct ConnectionMeta {
1546 init_cid: ConnectionId,
1547 cids_issued: u64,
1549 loc_cids: FxHashMap<u64, ConnectionId>,
1550 addresses: FourTuple,
1555 side: Side,
1556 reset_token: Option<(SocketAddr, ResetToken)>,
1559 peer_id: Option<PeerId>,
1561}
1562
1563#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1565pub struct ConnectionHandle(pub usize);
1566
1567impl From<ConnectionHandle> for usize {
1568 fn from(x: ConnectionHandle) -> Self {
1569 x.0
1570 }
1571}
1572
1573impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1574 type Output = ConnectionMeta;
1575 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1576 &self[ch.0]
1577 }
1578}
1579
1580impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1581 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1582 &mut self[ch.0]
1583 }
1584}
1585
1586pub enum DatagramEvent {
1588 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1590 NewConnection(Incoming),
1592 Response(Transmit),
1594}
1595
1596pub struct Incoming {
1598 received_at: Instant,
1599 addresses: FourTuple,
1600 ecn: Option<EcnCodepoint>,
1601 packet: InitialPacket,
1602 rest: Option<BytesMut>,
1603 crypto: Keys,
1604 token: IncomingToken,
1605 incoming_idx: usize,
1606 improper_drop_warner: IncomingImproperDropWarner,
1607}
1608
1609impl Incoming {
1610 pub fn local_ip(&self) -> Option<IpAddr> {
1614 self.addresses.local_ip
1615 }
1616
1617 pub fn remote_address(&self) -> SocketAddr {
1619 self.addresses.remote
1620 }
1621
1622 pub fn remote_address_validated(&self) -> bool {
1630 self.token.validated
1631 }
1632
1633 pub fn may_retry(&self) -> bool {
1638 self.token.retry_src_cid.is_none()
1639 }
1640
1641 pub fn orig_dst_cid(&self) -> &ConnectionId {
1643 &self.token.orig_dst_cid
1644 }
1645}
1646
1647impl fmt::Debug for Incoming {
1648 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1649 f.debug_struct("Incoming")
1650 .field("addresses", &self.addresses)
1651 .field("ecn", &self.ecn)
1652 .field("token", &self.token)
1655 .field("incoming_idx", &self.incoming_idx)
1656 .finish_non_exhaustive()
1658 }
1659}
1660
1661struct IncomingImproperDropWarner;
1662
1663impl IncomingImproperDropWarner {
1664 fn dismiss(self) {
1665 mem::forget(self);
1666 }
1667}
1668
1669impl Drop for IncomingImproperDropWarner {
1670 fn drop(&mut self) {
1671 warn!(
1672 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1673 (may cause memory leak and eventual inability to accept new connections)"
1674 );
1675 }
1676}
1677
1678#[derive(Debug, Error, Clone, PartialEq, Eq)]
1682pub enum ConnectError {
1683 #[error("endpoint stopping")]
1687 EndpointStopping,
1688 #[error("CIDs exhausted")]
1692 CidsExhausted,
1693 #[error("invalid server name: {0}")]
1695 InvalidServerName(String),
1696 #[error("invalid remote address: {0}")]
1700 InvalidRemoteAddress(SocketAddr),
1701 #[error("no default client config")]
1705 NoDefaultClientConfig,
1706 #[error("unsupported QUIC version")]
1708 UnsupportedVersion,
1709}
1710
1711#[derive(Debug)]
1713pub struct AcceptError {
1714 pub cause: ConnectionError,
1716 pub response: Option<Transmit>,
1718}
1719
1720#[derive(Debug, Error)]
1722#[error("retry() with validated Incoming")]
1723pub struct RetryError(Box<Incoming>);
1724
1725impl RetryError {
1726 pub fn into_incoming(self) -> Incoming {
1728 *self.0
1729 }
1730}
1731
1732#[derive(Default, Debug)]
1737struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1738
1739impl ResetTokenTable {
1740 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1741 self.0
1742 .entry(remote)
1743 .or_default()
1744 .insert(token, ch)
1745 .is_some()
1746 }
1747
1748 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1749 use std::collections::hash_map::Entry;
1750 match self.0.entry(remote) {
1751 Entry::Vacant(_) => {}
1752 Entry::Occupied(mut e) => {
1753 e.get_mut().remove(&token);
1754 if e.get().is_empty() {
1755 e.remove_entry();
1756 }
1757 }
1758 }
1759 }
1760
1761 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1762 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1763 self.0.get(&remote)?.get(&token)
1764 }
1765}
1766
1767#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1772struct FourTuple {
1773 remote: SocketAddr,
1774 local_ip: Option<IpAddr>,
1776}