1use std::collections::{HashMap, HashSet};
2use std::io;
3use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4use std::sync::Arc;
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6#[cfg(any(target_os = "linux", target_os = "android"))]
7use std::{mem, os::fd::AsRawFd};
8
9use bytes::{Bytes, BytesMut};
10use hmac::{Hmac, Mac};
11use sha2::Sha256;
12use socket2::{Domain, Protocol, Socket, Type};
13use tokio::net::UdpSocket;
14use tracing::{debug, warn};
15use zeroize::Zeroizing;
16
17use crate::error::DecodeError;
18use crate::handshake::{
19 AlreadyConnected, ConnectionBanned, IncompatibleProtocolVersion, IpRecentlyConnected,
20 NoFreeIncomingConnections, OfflinePacket, OpenConnectionReply1, OpenConnectionReply2,
21 Request2ParsePath, UnconnectedPong,
22};
23use crate::protocol::connected::{
24 ConnectedControlPacket, ConnectedPing, ConnectedPong, ConnectionRequestAccepted,
25 SYSTEM_ADDRESS_COUNT,
26};
27use crate::protocol::constants::{MAXIMUM_MTU_SIZE, MINIMUM_MTU_SIZE, RAKNET_PROTOCOL_VERSION};
28use crate::protocol::datagram::{Datagram, DatagramPayload};
29use crate::protocol::frame::Frame;
30use crate::protocol::reliability::Reliability;
31use crate::protocol::sequence24::Sequence24;
32use crate::session::{
33 QueuePayloadResult, RakPriority, ReceiptProgress, Session, SessionMetricsSnapshot, SessionState,
34};
35
36use super::config::{
37 ProcessingBudgetConfig, Request2ServerAddrPolicy, TransportConfig, TransportSocketTuning,
38};
39use super::proxy::{InboundProxyRoute, OutboundProxyRoute, ProxyRouter};
40use super::rate_limiter::{
41 BlockReason, ProcessingBudgetDecision, ProcessingBudgetMetricsSnapshot, RateLimitDecision,
42 RateLimiter, RateLimiterConfigSnapshot, RateLimiterMetricsSnapshot,
43};
44use super::session_pipeline::{
45 PipelineFrameAction, SessionPipeline, SessionPipelineMetricsSnapshot,
46};
47
48const RECV_PATH_MAX_NEW_DATAGRAMS: usize = 6;
49const RECV_PATH_MAX_RESEND_DATAGRAMS: usize = 6;
50const COOKIE_KEY_LEN: usize = 32;
51
52type HmacSha256 = Hmac<Sha256>;
53type SecretCookieKey = Zeroizing<[u8; COOKIE_KEY_LEN]>;
54
55#[derive(Debug)]
56pub struct ConnectedFrameDelivery {
57 pub payload: Bytes,
58 pub reliability: Reliability,
59 pub reliable_index: Option<Sequence24>,
60 pub sequence_index: Option<Sequence24>,
61 pub ordering_index: Option<Sequence24>,
62 pub ordering_channel: Option<u8>,
63}
64
65impl ConnectedFrameDelivery {
66 fn from_frame(frame: Frame) -> Self {
67 Self {
68 payload: frame.payload,
69 reliability: frame.header.reliability,
70 reliable_index: frame.reliable_index,
71 sequence_index: frame.sequence_index,
72 ordering_index: frame.ordering_index,
73 ordering_channel: frame.ordering_channel,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum RemoteDisconnectReason {
80 DisconnectionNotification { reason_code: Option<u8> },
81 DetectLostConnection,
82}
83
84#[derive(Debug)]
85pub enum TransportEvent {
86 RateLimited {
87 addr: SocketAddr,
88 },
89 ProxyDropped {
90 addr: SocketAddr,
91 },
92 SessionLimitReached {
93 addr: SocketAddr,
94 },
95 OfflinePacket {
96 addr: SocketAddr,
97 packet: OfflinePacket,
98 },
99 ConnectedFrames {
100 addr: SocketAddr,
101 client_guid: Option<u64>,
102 frames: Vec<ConnectedFrameDelivery>,
103 frame_count: usize,
104 receipts: ReceiptProgress,
105 },
106 ConnectedDatagramDroppedNoSession {
107 addr: SocketAddr,
108 },
109 PeerDisconnected {
110 addr: SocketAddr,
111 reason: RemoteDisconnectReason,
112 },
113 DecodeError {
114 addr: SocketAddr,
115 error: DecodeError,
116 },
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum QueueDispatchResult {
121 MissingSession,
122 Enqueued { reliable_bytes: usize },
123 Dropped,
124 Deferred,
125 Disconnected,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub struct TransportRateLimitConfig {
130 pub per_ip_packet_limit: usize,
131 pub global_packet_limit: usize,
132 pub rate_window: Duration,
133 pub block_duration: Duration,
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub struct TransportProcessingBudgetConfig {
138 pub enabled: bool,
139 pub per_ip_refill_units_per_sec: u32,
140 pub per_ip_burst_units: u32,
141 pub global_refill_units_per_sec: u32,
142 pub global_burst_units: u32,
143 pub bucket_idle_ttl: Duration,
144}
145
146#[derive(Debug, Default, Clone, Copy)]
147pub struct TransportMetricsSnapshot {
148 pub session_count: usize,
149 pub sessions_started_total: u64,
150 pub sessions_closed_total: u64,
151 pub packets_forwarded_total: u64,
152 pub bytes_forwarded_total: u64,
153 pub pending_outgoing_frames: usize,
154 pub pending_outgoing_bytes: usize,
155 pub pending_unhandled_frames: usize,
156 pub pending_unhandled_bytes: usize,
157 pub ingress_datagrams: u64,
158 pub ingress_frames: u64,
159 pub duplicate_reliable_drops: u64,
160 pub ordered_stale_drops: u64,
161 pub ordered_buffer_full_drops: u64,
162 pub sequenced_stale_drops: u64,
163 pub sequenced_missing_index_drops: u64,
164 pub reliable_sent_datagrams: u64,
165 pub resent_datagrams: u64,
166 pub ack_out_total: u64,
167 pub nack_out_total: u64,
168 pub acked_datagrams: u64,
169 pub nacked_datagrams: u64,
170 pub split_ttl_drops: u64,
171 pub outgoing_queue_drops: u64,
172 pub outgoing_queue_defers: u64,
173 pub outgoing_queue_disconnects: u64,
174 pub backpressure_delays: u64,
175 pub backpressure_drops: u64,
176 pub backpressure_disconnects: u64,
177 pub local_requested_disconnects: u64,
178 pub remote_disconnect_notifications: u64,
179 pub remote_detect_lost_disconnects: u64,
180 pub illegal_state_transitions: u64,
181 pub timed_out_sessions: u64,
182 pub keepalive_pings_sent: u64,
183 pub unhandled_frames_queued: u64,
184 pub unhandled_frames_flushed: u64,
185 pub unhandled_frames_dropped: u64,
186 pub rate_global_limit_hits: u64,
187 pub rate_ip_block_hits: u64,
188 pub rate_ip_block_hits_rate_exceeded: u64,
189 pub rate_ip_block_hits_manual: u64,
190 pub rate_ip_block_hits_handshake_heuristic: u64,
191 pub rate_ip_block_hits_cookie_mismatch_guard: u64,
192 pub rate_addresses_blocked: u64,
193 pub rate_addresses_blocked_rate_exceeded: u64,
194 pub rate_addresses_blocked_manual: u64,
195 pub rate_addresses_blocked_handshake_heuristic: u64,
196 pub rate_addresses_blocked_cookie_mismatch_guard: u64,
197 pub rate_addresses_unblocked: u64,
198 pub rate_blocked_addresses: usize,
199 pub rate_exception_addresses: usize,
200 pub processing_budget_drops_total: u64,
201 pub processing_budget_drops_ip_exhausted_total: u64,
202 pub processing_budget_drops_global_exhausted_total: u64,
203 pub processing_budget_consumed_units_total: u64,
204 pub processing_budget_active_ip_buckets: usize,
205 pub cookie_rotations: u64,
206 pub cookie_mismatch_drops: u64,
207 pub cookie_mismatch_blocks: u64,
208 pub handshake_stage_cancel_drops: u64,
209 pub handshake_req1_req2_timeouts: u64,
210 pub handshake_reply2_connect_timeouts: u64,
211 pub handshake_missing_req1_drops: u64,
212 pub handshake_auto_blocks: u64,
213 pub handshake_already_connected_rejects: u64,
214 pub handshake_ip_recently_connected_rejects: u64,
215 pub request2_server_addr_mismatch_drops: u64,
216 pub request2_legacy_parse_hits: u64,
217 pub request2_legacy_drops: u64,
218 pub request2_ambiguous_parse_hits: u64,
219 pub request2_ambiguous_drops: u64,
220 pub proxy_inbound_reroutes: u64,
221 pub proxy_inbound_drops: u64,
222 pub proxy_outbound_reroutes: u64,
223 pub proxy_outbound_drops: u64,
224 pub avg_srtt_ms: f64,
225 pub avg_rttvar_ms: f64,
226 pub avg_resend_rto_ms: f64,
227 pub avg_congestion_window_packets: f64,
228 pub resend_ratio: f64,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
232enum PendingHandshakeStage {
233 AwaitingRequest2,
234 AwaitingConnectionRequest,
235}
236
237#[derive(Debug, Clone, Copy)]
238struct PendingHandshake {
239 mtu: u16,
240 cookie: Option<u32>,
241 client_guid: Option<u64>,
242 stage: PendingHandshakeStage,
243 expires_at: Instant,
244}
245
246#[derive(Debug, Clone, Copy)]
247struct HandshakeHeuristicState {
248 window_started_at: Instant,
249 score: u32,
250}
251
252#[derive(Debug, Clone, Copy)]
253struct CookieMismatchGuardState {
254 window_started_at: Instant,
255 mismatches: u32,
256}
257
258#[derive(Debug, Clone, Copy)]
259enum HandshakeViolation {
260 Req1Req2Timeout,
261 Reply2ConnectTimeout,
262 MissingPendingReq1,
263 CookieMismatch,
264 ParseAnomalyDrop,
265}
266
267impl HandshakeViolation {
268 fn score(self, config: &TransportConfig) -> u32 {
269 let h = config.handshake_heuristics;
270 match self {
271 Self::Req1Req2Timeout => h.req1_req2_timeout_score,
272 Self::Reply2ConnectTimeout => h.reply2_connect_timeout_score,
273 Self::MissingPendingReq1 => h.missing_req1_score,
274 Self::CookieMismatch => h.cookie_mismatch_score,
275 Self::ParseAnomalyDrop => h.parse_anomaly_score,
276 }
277 }
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281enum ControlAction {
282 None,
283 CloseSession {
284 remote_reason: Option<RemoteDisconnectReason>,
285 illegal_state: bool,
286 },
287}
288
289pub struct TransportServer {
290 socket: UdpSocket,
291 recv_buffer: Vec<u8>,
292 config: TransportConfig,
293 advertisement_bytes: Bytes,
294 rate_limiter: RateLimiter,
295 proxy_router: Option<Arc<dyn ProxyRouter>>,
296 cookie_key_current: SecretCookieKey,
297 cookie_key_previous: Option<SecretCookieKey>,
298 next_cookie_rotation: Instant,
299 sessions: HashMap<SocketAddr, Session>,
300 session_pipelines: HashMap<SocketAddr, SessionPipeline>,
301 pending_handshakes: HashMap<SocketAddr, PendingHandshake>,
302 sessions_started_total: u64,
303 sessions_closed_total: u64,
304 packets_forwarded_total: u64,
305 bytes_forwarded_total: u64,
306 illegal_state_transitions: u64,
307 timed_out_sessions: u64,
308 local_requested_disconnects: u64,
309 remote_disconnect_notifications: u64,
310 remote_detect_lost_disconnects: u64,
311 keepalive_pings_sent: u64,
312 cookie_rotations: u64,
313 cookie_mismatch_drops: u64,
314 cookie_mismatch_blocks: u64,
315 handshake_stage_cancel_drops: u64,
316 handshake_req1_req2_timeouts: u64,
317 handshake_reply2_connect_timeouts: u64,
318 handshake_missing_req1_drops: u64,
319 handshake_auto_blocks: u64,
320 handshake_already_connected_rejects: u64,
321 handshake_ip_recently_connected_rejects: u64,
322 request2_server_addr_mismatch_drops: u64,
323 handshake_heuristics: HashMap<IpAddr, HandshakeHeuristicState>,
324 cookie_mismatch_guard_states: HashMap<IpAddr, CookieMismatchGuardState>,
325 ip_recently_connected_until: HashMap<IpAddr, Instant>,
326 request2_legacy_parse_hits: u64,
327 request2_legacy_drops: u64,
328 request2_ambiguous_parse_hits: u64,
329 request2_ambiguous_drops: u64,
330 proxy_inbound_reroutes: u64,
331 proxy_inbound_drops: u64,
332 proxy_outbound_reroutes: u64,
333 proxy_outbound_drops: u64,
334}
335
336impl TransportServer {
337 pub const fn supports_reuse_port_sharded_bind() -> bool {
338 cfg!(any(
339 target_os = "linux",
340 target_os = "android",
341 target_os = "macos",
342 target_os = "ios",
343 target_os = "freebsd",
344 target_os = "netbsd",
345 target_os = "openbsd"
346 ))
347 }
348
349 pub async fn bind(config: TransportConfig) -> io::Result<Self> {
350 config.validate().map_err(invalid_config_io_error)?;
351 if config.split_ipv4_ipv6_bind {
352 return Err(io::Error::new(
353 io::ErrorKind::InvalidInput,
354 "split_ipv4_ipv6_bind requires bind_shards(); use shard_count >= 1",
355 ));
356 }
357 let socket = Self::bind_socket(
358 config.bind_addr,
359 config.reuse_port,
360 config.ipv6_only,
361 config.socket_tuning,
362 )
363 .await?;
364 Self::with_socket(config, socket)
365 }
366
367 pub async fn bind_shards(config: TransportConfig, shard_count: usize) -> io::Result<Vec<Self>> {
368 config.validate().map_err(invalid_config_io_error)?;
369 let bind_plan = Self::build_shard_bind_plan(&config, shard_count.max(1));
370 if !config.reuse_port && Self::has_duplicate_bind_targets(&bind_plan) {
371 return Err(io::Error::new(
372 io::ErrorKind::InvalidInput,
373 "duplicate shard bind targets require transport_config.reuse_port = true",
374 ));
375 }
376
377 if Self::has_duplicate_bind_targets(&bind_plan) && !Self::supports_reuse_port_sharded_bind()
378 {
379 return Self::bind_shards_with_shared_socket(config, bind_plan).await;
380 }
381
382 let mut workers = Vec::with_capacity(bind_plan.len());
383 for bind_addr in bind_plan {
384 let mut worker_config = config.clone();
385 worker_config.bind_addr = bind_addr;
386 let socket = Self::bind_socket(
387 bind_addr,
388 worker_config.reuse_port,
389 worker_config.ipv6_only,
390 worker_config.socket_tuning,
391 )
392 .await?;
393 workers.push(Self::with_socket(worker_config, socket)?);
394 }
395 Ok(workers)
396 }
397
398 fn build_shard_bind_plan(config: &TransportConfig, shard_count: usize) -> Vec<SocketAddr> {
399 let targets = Self::bind_targets(config);
400 let effective_count = if config.split_ipv4_ipv6_bind {
401 shard_count.max(targets.len())
402 } else {
403 shard_count
404 };
405 let mut plan = Vec::with_capacity(effective_count);
406 for idx in 0..effective_count {
407 plan.push(targets[idx % targets.len()]);
408 }
409 plan
410 }
411
412 fn bind_targets(config: &TransportConfig) -> Vec<SocketAddr> {
413 if !config.split_ipv4_ipv6_bind {
414 return vec![config.bind_addr];
415 }
416
417 let port = config.bind_addr.port();
418 let v4 = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port));
419 let v6 = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0));
420 match config.bind_addr {
421 SocketAddr::V4(_) => vec![v4, v6],
422 SocketAddr::V6(_) => vec![v6, v4],
423 }
424 }
425
426 fn has_duplicate_bind_targets(bind_plan: &[SocketAddr]) -> bool {
427 let mut unique = HashSet::with_capacity(bind_plan.len());
428 for addr in bind_plan {
429 if !unique.insert(*addr) {
430 return true;
431 }
432 }
433 false
434 }
435
436 async fn bind_shards_with_shared_socket(
437 config: TransportConfig,
438 bind_plan: Vec<SocketAddr>,
439 ) -> io::Result<Vec<Self>> {
440 let mut base_sockets: HashMap<SocketAddr, std::net::UdpSocket> = HashMap::with_capacity(2);
441 let mut workers = Vec::with_capacity(bind_plan.len());
442 for bind_addr in bind_plan {
443 let socket = if let Some(base_socket) = base_sockets.get(&bind_addr) {
444 let clone = base_socket.try_clone()?;
445 clone.set_nonblocking(true)?;
446 UdpSocket::from_std(clone)?
447 } else {
448 let base_socket =
449 Self::bind_socket(bind_addr, false, config.ipv6_only, config.socket_tuning)
450 .await?;
451 let base_std = base_socket.into_std()?;
452 base_std.set_nonblocking(true)?;
453 let worker_std = base_std.try_clone()?;
454 worker_std.set_nonblocking(true)?;
455 base_sockets.insert(bind_addr, base_std);
456 UdpSocket::from_std(worker_std)?
457 };
458
459 let mut worker_config = config.clone();
460 worker_config.bind_addr = bind_addr;
461 workers.push(Self::with_socket(worker_config, socket)?);
462 }
463
464 Ok(workers)
465 }
466
467 fn with_socket(config: TransportConfig, socket: UdpSocket) -> io::Result<Self> {
468 let mut rate_limiter = RateLimiter::new(
469 config.per_ip_packet_limit,
470 config.global_packet_limit,
471 config.rate_window,
472 config.block_duration,
473 );
474 rate_limiter.set_processing_budget_config(config.processing_budget);
475 for ip in &config.rate_limit_exceptions {
476 rate_limiter.add_exception(*ip);
477 }
478 let now = Instant::now();
479 let cookie_key_current = SecretCookieKey::new(random_cookie_key());
480 let next_cookie_rotation = now + config.cookie_rotation_interval;
481 let advertisement_bytes = Bytes::copy_from_slice(config.advertisement.as_bytes());
482
483 Ok(Self {
484 socket,
485 recv_buffer: vec![0u8; config.mtu.max(MAXIMUM_MTU_SIZE as usize).max(2048)],
486 config,
487 advertisement_bytes,
488 rate_limiter,
489 proxy_router: None,
490 cookie_key_current,
491 cookie_key_previous: None,
492 next_cookie_rotation,
493 sessions: HashMap::new(),
494 session_pipelines: HashMap::new(),
495 pending_handshakes: HashMap::new(),
496 sessions_started_total: 0,
497 sessions_closed_total: 0,
498 packets_forwarded_total: 0,
499 bytes_forwarded_total: 0,
500 illegal_state_transitions: 0,
501 timed_out_sessions: 0,
502 local_requested_disconnects: 0,
503 remote_disconnect_notifications: 0,
504 remote_detect_lost_disconnects: 0,
505 keepalive_pings_sent: 0,
506 cookie_rotations: 0,
507 cookie_mismatch_drops: 0,
508 cookie_mismatch_blocks: 0,
509 handshake_stage_cancel_drops: 0,
510 handshake_req1_req2_timeouts: 0,
511 handshake_reply2_connect_timeouts: 0,
512 handshake_missing_req1_drops: 0,
513 handshake_auto_blocks: 0,
514 handshake_already_connected_rejects: 0,
515 handshake_ip_recently_connected_rejects: 0,
516 request2_server_addr_mismatch_drops: 0,
517 handshake_heuristics: HashMap::new(),
518 cookie_mismatch_guard_states: HashMap::new(),
519 ip_recently_connected_until: HashMap::new(),
520 request2_legacy_parse_hits: 0,
521 request2_legacy_drops: 0,
522 request2_ambiguous_parse_hits: 0,
523 request2_ambiguous_drops: 0,
524 proxy_inbound_reroutes: 0,
525 proxy_inbound_drops: 0,
526 proxy_outbound_reroutes: 0,
527 proxy_outbound_drops: 0,
528 })
529 }
530
531 async fn bind_socket(
532 addr: SocketAddr,
533 reuse_port: bool,
534 ipv6_only: bool,
535 socket_tuning: TransportSocketTuning,
536 ) -> io::Result<UdpSocket> {
537 let domain = if addr.is_ipv4() {
538 Domain::IPV4
539 } else {
540 Domain::IPV6
541 };
542
543 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
544 socket.set_reuse_address(true)?;
545 #[cfg(any(
546 target_os = "linux",
547 target_os = "android",
548 target_os = "macos",
549 target_os = "ios",
550 target_os = "freebsd",
551 target_os = "netbsd",
552 target_os = "openbsd"
553 ))]
554 if reuse_port {
555 socket.set_reuse_port(true)?;
556 }
557 #[cfg(not(any(
558 target_os = "linux",
559 target_os = "android",
560 target_os = "macos",
561 target_os = "ios",
562 target_os = "freebsd",
563 target_os = "netbsd",
564 target_os = "openbsd"
565 )))]
566 let _ = reuse_port;
567
568 if addr.is_ipv6() {
569 socket.set_only_v6(ipv6_only)?;
570 }
571 Self::apply_socket_tuning(&socket, addr, socket_tuning)?;
572 socket.set_nonblocking(true)?;
573 socket.bind(&addr.into())?;
574 let std_socket: std::net::UdpSocket = socket.into();
575 UdpSocket::from_std(std_socket)
576 }
577
578 fn apply_socket_tuning(
579 socket: &Socket,
580 addr: SocketAddr,
581 tuning: TransportSocketTuning,
582 ) -> io::Result<()> {
583 if let Some(size) = tuning.recv_buffer_size {
584 socket.set_recv_buffer_size(size)?;
585 }
586 if let Some(size) = tuning.send_buffer_size {
587 socket.set_send_buffer_size(size)?;
588 }
589
590 match addr {
591 SocketAddr::V4(_) => {
592 if let Some(ttl) = tuning.ipv4_ttl {
593 socket.set_ttl_v4(ttl)?;
594 }
595 #[cfg(not(any(
596 target_os = "fuchsia",
597 target_os = "redox",
598 target_os = "solaris",
599 target_os = "illumos",
600 target_os = "haiku",
601 )))]
602 if let Some(tos) = tuning.ipv4_tos {
603 socket.set_tos_v4(tos)?;
604 }
605
606 if tuning.disable_ip_fragmentation {
607 set_df_path_mtu_discovery_v4(socket)?;
608 }
609 }
610 SocketAddr::V6(_) => {
611 if let Some(hops) = tuning.ipv6_unicast_hops {
612 socket.set_unicast_hops_v6(hops)?;
613 }
614 if tuning.disable_ip_fragmentation {
615 set_df_path_mtu_discovery_v6(socket)?;
616 }
617 }
618 }
619
620 Ok(())
621 }
622
623 pub fn local_addr(&self) -> io::Result<SocketAddr> {
624 self.socket.local_addr()
625 }
626
627 pub fn session_count(&self) -> usize {
628 self.sessions.len()
629 }
630
631 pub async fn recv_and_process(&mut self) -> io::Result<TransportEvent> {
632 let (len, observed_addr) = self.socket.recv_from(&mut self.recv_buffer).await?;
633 let now = Instant::now();
634 let local_addr = self.local_addr().unwrap_or(self.config.bind_addr);
635 let addr = if let Some(router) = &self.proxy_router {
636 match router.route_inbound(observed_addr, local_addr) {
637 InboundProxyRoute::Local { session_addr } => {
638 if session_addr != observed_addr {
639 self.proxy_inbound_reroutes = self.proxy_inbound_reroutes.saturating_add(1);
640 }
641 session_addr
642 }
643 InboundProxyRoute::Drop => {
644 self.proxy_inbound_drops = self.proxy_inbound_drops.saturating_add(1);
645 return Ok(TransportEvent::ProxyDropped {
646 addr: observed_addr,
647 });
648 }
649 }
650 } else {
651 observed_addr
652 };
653
654 self.prune_pending_handshakes(now);
655 self.prune_cookie_mismatch_guard_states(now);
656 self.prune_ip_recently_connected(now);
657 self.prune_idle_sessions(now, Some(addr));
658 self.rate_limiter.tick(now);
659 self.rotate_cookie_keys_if_needed(now);
660
661 let slice = &self.recv_buffer[..len];
662 let Some(first) = slice.first().copied() else {
663 return Ok(TransportEvent::DecodeError {
664 addr,
665 error: DecodeError::UnexpectedEof,
666 });
667 };
668
669 let is_offline = is_offline_packet_id(first);
670 match self.rate_limiter.check(addr.ip(), now) {
671 RateLimitDecision::Allow => {}
672 RateLimitDecision::GlobalLimit => {
673 return Ok(TransportEvent::RateLimited { addr });
674 }
675 RateLimitDecision::IpBlocked { newly_blocked, .. } => {
676 if is_offline && newly_blocked {
677 self.send_connection_banned(addr).await?;
678 }
679 return Ok(TransportEvent::RateLimited { addr });
680 }
681 }
682
683 if is_offline {
684 let mut payload = slice;
685 let packet =
686 match OfflinePacket::decode_with_magic(&mut payload, self.config.unconnected_magic)
687 {
688 Ok(packet) => packet,
689 Err(error) => return Ok(TransportEvent::DecodeError { addr, error }),
690 };
691
692 if let Some(event) = self.handle_offline_packet(addr, &packet, now).await? {
693 return Ok(event);
694 }
695
696 return Ok(TransportEvent::OfflinePacket { addr, packet });
697 }
698
699 let mut payload = slice;
700 let datagram = match Datagram::decode(&mut payload) {
701 Ok(d) => d,
702 Err(error) => return Ok(TransportEvent::DecodeError { addr, error }),
703 };
704
705 if !self.sessions.contains_key(&addr) && !self.pending_handshakes.contains_key(&addr) {
706 return Ok(TransportEvent::ConnectedDatagramDroppedNoSession { addr });
707 }
708
709 if !self.sessions.contains_key(&addr) && self.sessions.len() >= self.config.max_sessions {
710 return Ok(TransportEvent::SessionLimitReached { addr });
711 }
712
713 if matches!(&datagram.payload, DatagramPayload::Frames(_))
714 && !self.rate_limiter.is_exception(addr.ip())
715 {
716 let processing_cost = estimate_connected_datagram_processing_cost(len, &datagram);
717 let budget_decision =
718 self.rate_limiter
719 .consume_processing_budget(addr.ip(), processing_cost, now);
720 match budget_decision {
721 ProcessingBudgetDecision::Allow => {}
722 ProcessingBudgetDecision::IpExhausted
723 | ProcessingBudgetDecision::GlobalExhausted => {
724 warn!(
725 %addr,
726 cost_units = processing_cost,
727 decision = ?budget_decision,
728 "dropping connected datagram due to processing budget exhaustion"
729 );
730 return Ok(TransportEvent::RateLimited { addr });
731 }
732 }
733 }
734
735 let server_addr = self.local_addr().unwrap_or(self.config.bind_addr);
736 let now_millis = unix_timestamp_millis();
737 let unhandled_queue_max_frames = self.config.unhandled_queue_max_frames;
738 let unhandled_queue_max_bytes = self.config.unhandled_queue_max_bytes;
739
740 let mut decode_error: Option<DecodeError> = None;
741 let mut close_session = false;
742 let mut illegal_transition = false;
743 let mut remote_disconnect_reason: Option<RemoteDisconnectReason> = None;
744
745 let (mut frames, receipts, datagrams_to_send, became_connected) = {
746 let session_tunables = self.config.session_tunables.clone();
747 let session = self
748 .sessions
749 .entry(addr)
750 .or_insert_with(|| Session::with_tunables(self.config.mtu, session_tunables));
751 let pipeline = self.session_pipelines.entry(addr).or_insert_with(|| {
752 SessionPipeline::new(unhandled_queue_max_frames, unhandled_queue_max_bytes)
753 });
754 session.touch_activity(now);
755
756 let frames = match session.ingest_datagram(datagram, now) {
757 Ok(frames) => frames,
758 Err(error) => return Ok(TransportEvent::DecodeError { addr, error }),
759 };
760
761 let receipts = session.process_incoming_receipts(now);
762 let mut app_frames = Vec::new();
763 let was_connected = session.state() == SessionState::Connected;
764
765 for frame in frames {
766 let Some(id) = frame.payload.first().copied() else {
767 continue;
768 };
769
770 if !is_connected_control_id(id) {
771 match pipeline.route_inbound_app_frame(session.state(), frame, &mut app_frames)
772 {
773 PipelineFrameAction::Deliver | PipelineFrameAction::Queued => {}
774 PipelineFrameAction::Overflow => {
775 close_session = true;
776 break;
777 }
778 }
779 continue;
780 }
781
782 let mut control_payload = &frame.payload[..];
783 let control_packet = match ConnectedControlPacket::decode(&mut control_payload) {
784 Ok(pkt) => pkt,
785 Err(error) => {
786 decode_error = Some(error);
787 break;
788 }
789 };
790
791 match Self::apply_connected_control(
792 session,
793 addr,
794 server_addr,
795 now_millis,
796 control_packet,
797 ) {
798 Ok(ControlAction::None) => {}
799 Ok(ControlAction::CloseSession {
800 remote_reason,
801 illegal_state,
802 }) => {
803 close_session = true;
804 illegal_transition = illegal_state;
805 if remote_disconnect_reason.is_none() {
806 remote_disconnect_reason = remote_reason;
807 }
808 break;
809 }
810 Err(error) => {
811 decode_error = Some(error);
812 break;
813 }
814 }
815 }
816
817 if !close_session && decode_error.is_none() {
818 let _ = pipeline.flush_if_connected(session.state(), &mut app_frames);
819 }
820
821 let immediate_datagrams = if close_session {
822 Vec::new()
823 } else {
824 session.on_tick(
825 now,
826 RECV_PATH_MAX_NEW_DATAGRAMS,
827 self.config.mtu.saturating_mul(RECV_PATH_MAX_NEW_DATAGRAMS),
828 RECV_PATH_MAX_RESEND_DATAGRAMS,
829 self.config
830 .mtu
831 .saturating_mul(RECV_PATH_MAX_RESEND_DATAGRAMS),
832 )
833 };
834 if session.take_backpressure_disconnect() {
835 close_session = true;
836 }
837
838 let app_frames: Vec<ConnectedFrameDelivery> = app_frames
839 .into_iter()
840 .map(ConnectedFrameDelivery::from_frame)
841 .collect();
842 let became_connected = !was_connected && session.state() == SessionState::Connected;
843
844 (app_frames, receipts, immediate_datagrams, became_connected)
845 };
846
847 if let Some(error) = decode_error {
848 return Ok(TransportEvent::DecodeError { addr, error });
849 }
850
851 if illegal_transition {
852 self.illegal_state_transitions = self.illegal_state_transitions.saturating_add(1);
853 } else {
854 for datagram in &datagrams_to_send {
855 self.send_datagram(addr, datagram).await?;
856 }
857 }
858
859 if close_session {
860 frames.clear();
861 if let Some(reason) = remote_disconnect_reason {
862 self.record_remote_disconnect(reason);
863 }
864 self.close_session(addr);
865 if let Some(reason) = remote_disconnect_reason {
866 return Ok(TransportEvent::PeerDisconnected { addr, reason });
867 }
868 }
869
870 if became_connected {
871 self.sessions_started_total = self.sessions_started_total.saturating_add(1);
872 }
873
874 let client_guid = if became_connected {
875 self.pending_handshakes
876 .get(&addr)
877 .and_then(|pending| pending.client_guid)
878 } else {
879 None
880 };
881
882 if self
883 .sessions
884 .get(&addr)
885 .is_some_and(|session| session.state() == SessionState::Connected)
886 {
887 self.pending_handshakes.remove(&addr);
888 }
889
890 let frame_count = frames.len();
891 let forwarded_bytes = frames
892 .iter()
893 .map(|frame| frame.payload.len() as u64)
894 .sum::<u64>();
895 self.packets_forwarded_total = self
896 .packets_forwarded_total
897 .saturating_add(frame_count as u64);
898 self.bytes_forwarded_total = self.bytes_forwarded_total.saturating_add(forwarded_bytes);
899 Ok(TransportEvent::ConnectedFrames {
900 addr,
901 client_guid,
902 frames,
903 frame_count,
904 receipts,
905 })
906 }
907
908 pub fn config(&self) -> &TransportConfig {
909 &self.config
910 }
911
912 pub fn set_proxy_router(&mut self, router: Arc<dyn ProxyRouter>) {
913 self.proxy_router = Some(router);
914 }
915
916 pub fn clear_proxy_router(&mut self) {
917 self.proxy_router = None;
918 }
919
920 pub fn add_rate_limit_exception(&mut self, ip: IpAddr) {
921 self.rate_limiter.add_exception(ip);
922 }
923
924 pub fn remove_rate_limit_exception(&mut self, ip: IpAddr) {
925 self.rate_limiter.remove_exception(ip);
926 }
927
928 pub fn rate_limit_config(&self) -> TransportRateLimitConfig {
929 let cfg: RateLimiterConfigSnapshot = self.rate_limiter.config_snapshot();
930 TransportRateLimitConfig {
931 per_ip_packet_limit: cfg.per_ip_limit,
932 global_packet_limit: cfg.global_limit,
933 rate_window: cfg.window,
934 block_duration: cfg.block_duration,
935 }
936 }
937
938 pub fn processing_budget_config(&self) -> TransportProcessingBudgetConfig {
939 let cfg = self.rate_limiter.processing_budget_config();
940 TransportProcessingBudgetConfig {
941 enabled: cfg.enabled,
942 per_ip_refill_units_per_sec: cfg.per_ip_refill_units_per_sec,
943 per_ip_burst_units: cfg.per_ip_burst_units,
944 global_refill_units_per_sec: cfg.global_refill_units_per_sec,
945 global_burst_units: cfg.global_burst_units,
946 bucket_idle_ttl: cfg.bucket_idle_ttl,
947 }
948 }
949
950 pub fn set_processing_budget_config(&mut self, config: TransportProcessingBudgetConfig) {
951 let raw = ProcessingBudgetConfig {
952 enabled: config.enabled,
953 per_ip_refill_units_per_sec: config.per_ip_refill_units_per_sec,
954 per_ip_burst_units: config.per_ip_burst_units,
955 global_refill_units_per_sec: config.global_refill_units_per_sec,
956 global_burst_units: config.global_burst_units,
957 bucket_idle_ttl: config.bucket_idle_ttl,
958 };
959 self.rate_limiter.set_processing_budget_config(raw);
960 self.config.processing_budget = self.rate_limiter.processing_budget_config();
961 }
962
963 pub fn set_rate_limit_config(&mut self, config: TransportRateLimitConfig) {
964 self.rate_limiter.update_limits(
965 config.per_ip_packet_limit,
966 config.global_packet_limit,
967 config.rate_window,
968 config.block_duration,
969 );
970 let effective = self.rate_limit_config();
971 self.config.per_ip_packet_limit = effective.per_ip_packet_limit;
972 self.config.global_packet_limit = effective.global_packet_limit;
973 self.config.rate_window = effective.rate_window;
974 self.config.block_duration = effective.block_duration;
975 }
976
977 pub fn set_per_ip_packet_limit(&mut self, limit: usize) {
978 self.rate_limiter.set_per_ip_limit(limit);
979 self.config.per_ip_packet_limit = self.rate_limit_config().per_ip_packet_limit;
980 }
981
982 pub fn set_global_packet_limit(&mut self, limit: usize) {
983 self.rate_limiter.set_global_limit(limit);
984 self.config.global_packet_limit = self.rate_limit_config().global_packet_limit;
985 }
986
987 pub fn set_rate_window(&mut self, window: Duration) {
988 self.rate_limiter.set_window(window);
989 self.config.rate_window = self.rate_limit_config().rate_window;
990 }
991
992 pub fn set_block_duration(&mut self, block_duration: Duration) {
993 self.rate_limiter.set_block_duration(block_duration);
994 self.config.block_duration = self.rate_limit_config().block_duration;
995 }
996
997 pub fn block_address(&mut self, ip: IpAddr) -> bool {
998 self.rate_limiter.block_address(ip)
999 }
1000
1001 pub fn block_address_for(&mut self, ip: IpAddr, duration: Duration) -> bool {
1002 self.rate_limiter
1003 .block_address_for(ip, Instant::now(), duration)
1004 }
1005
1006 pub fn unblock_address(&mut self, ip: IpAddr) -> bool {
1007 self.rate_limiter.unblock_address(ip)
1008 }
1009
1010 pub fn disconnect_peer(&mut self, addr: SocketAddr) -> bool {
1011 let exists =
1012 self.sessions.contains_key(&addr) || self.pending_handshakes.contains_key(&addr);
1013 if exists {
1014 self.local_requested_disconnects = self.local_requested_disconnects.saturating_add(1);
1015 self.close_session(addr);
1016 }
1017 exists
1018 }
1019
1020 pub fn metrics_snapshot(&self) -> TransportMetricsSnapshot {
1021 let mut total = TransportMetricsSnapshot {
1022 session_count: self.sessions.len(),
1023 sessions_started_total: self.sessions_started_total,
1024 sessions_closed_total: self.sessions_closed_total,
1025 packets_forwarded_total: self.packets_forwarded_total,
1026 bytes_forwarded_total: self.bytes_forwarded_total,
1027 illegal_state_transitions: self.illegal_state_transitions,
1028 timed_out_sessions: self.timed_out_sessions,
1029 local_requested_disconnects: self.local_requested_disconnects,
1030 remote_disconnect_notifications: self.remote_disconnect_notifications,
1031 remote_detect_lost_disconnects: self.remote_detect_lost_disconnects,
1032 keepalive_pings_sent: self.keepalive_pings_sent,
1033 cookie_rotations: self.cookie_rotations,
1034 cookie_mismatch_drops: self.cookie_mismatch_drops,
1035 cookie_mismatch_blocks: self.cookie_mismatch_blocks,
1036 handshake_stage_cancel_drops: self.handshake_stage_cancel_drops,
1037 handshake_req1_req2_timeouts: self.handshake_req1_req2_timeouts,
1038 handshake_reply2_connect_timeouts: self.handshake_reply2_connect_timeouts,
1039 handshake_missing_req1_drops: self.handshake_missing_req1_drops,
1040 handshake_auto_blocks: self.handshake_auto_blocks,
1041 handshake_already_connected_rejects: self.handshake_already_connected_rejects,
1042 handshake_ip_recently_connected_rejects: self.handshake_ip_recently_connected_rejects,
1043 request2_server_addr_mismatch_drops: self.request2_server_addr_mismatch_drops,
1044 request2_legacy_parse_hits: self.request2_legacy_parse_hits,
1045 request2_legacy_drops: self.request2_legacy_drops,
1046 request2_ambiguous_parse_hits: self.request2_ambiguous_parse_hits,
1047 request2_ambiguous_drops: self.request2_ambiguous_drops,
1048 proxy_inbound_reroutes: self.proxy_inbound_reroutes,
1049 proxy_inbound_drops: self.proxy_inbound_drops,
1050 proxy_outbound_reroutes: self.proxy_outbound_reroutes,
1051 proxy_outbound_drops: self.proxy_outbound_drops,
1052 ..TransportMetricsSnapshot::default()
1053 };
1054 let mut srtt_sum = 0.0;
1055 let mut rttvar_sum = 0.0;
1056 let mut resend_rto_sum = 0.0;
1057 let mut cwnd_sum = 0.0;
1058
1059 for session in self.sessions.values() {
1060 let s: SessionMetricsSnapshot = session.metrics_snapshot();
1061 total.pending_outgoing_frames += s.pending_outgoing_frames;
1062 total.pending_outgoing_bytes += s.pending_outgoing_bytes;
1063 total.ingress_datagrams = total.ingress_datagrams.saturating_add(s.ingress_datagrams);
1064 total.ingress_frames = total.ingress_frames.saturating_add(s.ingress_frames);
1065 total.duplicate_reliable_drops = total
1066 .duplicate_reliable_drops
1067 .saturating_add(s.duplicate_reliable_drops);
1068 total.ordered_stale_drops = total
1069 .ordered_stale_drops
1070 .saturating_add(s.ordered_stale_drops);
1071 total.ordered_buffer_full_drops = total
1072 .ordered_buffer_full_drops
1073 .saturating_add(s.ordered_buffer_full_drops);
1074 total.sequenced_stale_drops = total
1075 .sequenced_stale_drops
1076 .saturating_add(s.sequenced_stale_drops);
1077 total.sequenced_missing_index_drops = total
1078 .sequenced_missing_index_drops
1079 .saturating_add(s.sequenced_missing_index_drops);
1080 total.reliable_sent_datagrams = total
1081 .reliable_sent_datagrams
1082 .saturating_add(s.reliable_sent_datagrams);
1083 total.resent_datagrams = total.resent_datagrams.saturating_add(s.resent_datagrams);
1084 total.ack_out_total = total.ack_out_total.saturating_add(s.ack_out_datagrams);
1085 total.nack_out_total = total.nack_out_total.saturating_add(s.nack_out_datagrams);
1086 total.acked_datagrams = total.acked_datagrams.saturating_add(s.acked_datagrams);
1087 total.nacked_datagrams = total.nacked_datagrams.saturating_add(s.nacked_datagrams);
1088 total.split_ttl_drops = total.split_ttl_drops.saturating_add(s.split_ttl_drops);
1089 total.outgoing_queue_drops = total
1090 .outgoing_queue_drops
1091 .saturating_add(s.outgoing_queue_drops);
1092 total.outgoing_queue_defers = total
1093 .outgoing_queue_defers
1094 .saturating_add(s.outgoing_queue_defers);
1095 total.outgoing_queue_disconnects = total
1096 .outgoing_queue_disconnects
1097 .saturating_add(s.outgoing_queue_disconnects);
1098 total.backpressure_delays = total
1099 .backpressure_delays
1100 .saturating_add(s.backpressure_delays);
1101 total.backpressure_drops = total
1102 .backpressure_drops
1103 .saturating_add(s.backpressure_drops);
1104 total.backpressure_disconnects = total
1105 .backpressure_disconnects
1106 .saturating_add(s.backpressure_disconnects);
1107
1108 srtt_sum += s.srtt_ms;
1109 rttvar_sum += s.rttvar_ms;
1110 resend_rto_sum += s.resend_rto_ms;
1111 cwnd_sum += s.congestion_window_packets;
1112 }
1113
1114 for pipeline in self.session_pipelines.values() {
1115 let p: SessionPipelineMetricsSnapshot = pipeline.metrics_snapshot();
1116 total.pending_unhandled_frames += p.pending_unhandled_frames;
1117 total.pending_unhandled_bytes += p.pending_unhandled_bytes;
1118 total.unhandled_frames_queued = total
1119 .unhandled_frames_queued
1120 .saturating_add(p.unhandled_frames_queued);
1121 total.unhandled_frames_flushed = total
1122 .unhandled_frames_flushed
1123 .saturating_add(p.unhandled_frames_flushed);
1124 total.unhandled_frames_dropped = total
1125 .unhandled_frames_dropped
1126 .saturating_add(p.unhandled_frames_dropped);
1127 }
1128
1129 let r: RateLimiterMetricsSnapshot = self.rate_limiter.metrics_snapshot();
1130 let p: ProcessingBudgetMetricsSnapshot =
1131 self.rate_limiter.processing_budget_metrics_snapshot();
1132 total.rate_global_limit_hits = r.global_limit_hits;
1133 total.rate_ip_block_hits = r.ip_block_hits;
1134 total.rate_ip_block_hits_rate_exceeded = r.ip_block_hits_rate_exceeded;
1135 total.rate_ip_block_hits_manual = r.ip_block_hits_manual;
1136 total.rate_ip_block_hits_handshake_heuristic = r.ip_block_hits_handshake_heuristic;
1137 total.rate_ip_block_hits_cookie_mismatch_guard = r.ip_block_hits_cookie_mismatch_guard;
1138 total.rate_addresses_blocked = r.addresses_blocked;
1139 total.rate_addresses_blocked_rate_exceeded = r.addresses_blocked_rate_exceeded;
1140 total.rate_addresses_blocked_manual = r.addresses_blocked_manual;
1141 total.rate_addresses_blocked_handshake_heuristic = r.addresses_blocked_handshake_heuristic;
1142 total.rate_addresses_blocked_cookie_mismatch_guard =
1143 r.addresses_blocked_cookie_mismatch_guard;
1144 total.rate_addresses_unblocked = r.addresses_unblocked;
1145 total.rate_blocked_addresses = r.blocked_addresses;
1146 total.rate_exception_addresses = r.exception_addresses;
1147 total.processing_budget_drops_total = p.drops_total;
1148 total.processing_budget_drops_ip_exhausted_total = p.drops_ip_exhausted;
1149 total.processing_budget_drops_global_exhausted_total = p.drops_global_exhausted;
1150 total.processing_budget_consumed_units_total = p.consumed_units_total;
1151 total.processing_budget_active_ip_buckets = p.active_ip_buckets;
1152
1153 if total.session_count > 0 {
1154 let denom = total.session_count as f64;
1155 total.avg_srtt_ms = srtt_sum / denom;
1156 total.avg_rttvar_ms = rttvar_sum / denom;
1157 total.avg_resend_rto_ms = resend_rto_sum / denom;
1158 total.avg_congestion_window_packets = cwnd_sum / denom;
1159 }
1160
1161 total.resend_ratio = if total.reliable_sent_datagrams == 0 {
1162 0.0
1163 } else {
1164 total.resent_datagrams as f64 / total.reliable_sent_datagrams as f64
1165 };
1166
1167 total
1168 }
1169
1170 pub async fn flush_resends(
1171 &mut self,
1172 max_per_session: usize,
1173 max_bytes_per_session: usize,
1174 ) -> io::Result<usize> {
1175 self.tick_outbound(0, 0, max_per_session, max_bytes_per_session)
1176 .await
1177 }
1178
1179 pub fn queue_payload(
1180 &mut self,
1181 addr: SocketAddr,
1182 payload: Bytes,
1183 reliability: Reliability,
1184 channel: u8,
1185 priority: RakPriority,
1186 ) -> QueueDispatchResult {
1187 self.queue_payload_with_optional_receipt(
1188 addr,
1189 payload,
1190 reliability,
1191 channel,
1192 priority,
1193 None,
1194 )
1195 }
1196
1197 pub fn queue_payload_with_receipt(
1198 &mut self,
1199 addr: SocketAddr,
1200 payload: Bytes,
1201 reliability: Reliability,
1202 channel: u8,
1203 priority: RakPriority,
1204 receipt_id: u64,
1205 ) -> QueueDispatchResult {
1206 self.queue_payload_with_optional_receipt(
1207 addr,
1208 payload,
1209 reliability,
1210 channel,
1211 priority,
1212 Some(receipt_id),
1213 )
1214 }
1215
1216 fn queue_payload_with_optional_receipt(
1217 &mut self,
1218 addr: SocketAddr,
1219 payload: Bytes,
1220 reliability: Reliability,
1221 channel: u8,
1222 priority: RakPriority,
1223 receipt_id: Option<u64>,
1224 ) -> QueueDispatchResult {
1225 let Some(session) = self.sessions.get_mut(&addr) else {
1226 return QueueDispatchResult::MissingSession;
1227 };
1228 let decision =
1229 session.queue_payload_with_receipt(payload, reliability, channel, priority, receipt_id);
1230 let disconnect = matches!(decision, QueuePayloadResult::DisconnectRequested)
1231 || session.take_backpressure_disconnect();
1232 if disconnect {
1233 self.close_session(addr);
1234 return QueueDispatchResult::Disconnected;
1235 }
1236
1237 match decision {
1238 QueuePayloadResult::Enqueued { reliable_bytes } => {
1239 QueueDispatchResult::Enqueued { reliable_bytes }
1240 }
1241 QueuePayloadResult::Dropped => QueueDispatchResult::Dropped,
1242 QueuePayloadResult::Deferred => QueueDispatchResult::Deferred,
1243 QueuePayloadResult::DisconnectRequested => QueueDispatchResult::Disconnected,
1244 }
1245 }
1246
1247 pub async fn tick_outbound(
1248 &mut self,
1249 max_new_datagrams_per_session: usize,
1250 max_new_bytes_per_session: usize,
1251 max_resend_datagrams_per_session: usize,
1252 max_resend_bytes_per_session: usize,
1253 ) -> io::Result<usize> {
1254 let now = Instant::now();
1255 self.prune_pending_handshakes(now);
1256 self.prune_cookie_mismatch_guard_states(now);
1257 self.prune_ip_recently_connected(now);
1258 self.prune_idle_sessions(now, None);
1259 self.queue_keepalive_pings(now);
1260 self.rate_limiter.tick(now);
1261 self.rotate_cookie_keys_if_needed(now);
1262
1263 let mut pending = Vec::new();
1264
1265 for (addr, session) in &mut self.sessions {
1266 let datagrams = session.on_tick(
1267 now,
1268 max_new_datagrams_per_session,
1269 max_new_bytes_per_session,
1270 max_resend_datagrams_per_session,
1271 max_resend_bytes_per_session,
1272 );
1273 for d in datagrams {
1274 pending.push((*addr, d));
1275 }
1276 }
1277
1278 for (addr, datagram) in &pending {
1279 self.send_datagram(*addr, datagram).await?;
1280 }
1281
1282 Ok(pending.len())
1283 }
1284
1285 pub async fn send_datagram(&mut self, addr: SocketAddr, datagram: &Datagram) -> io::Result<()> {
1286 let Some(target_addr) = self.route_outbound_target(addr) else {
1287 return Ok(());
1288 };
1289 let mut out = BytesMut::with_capacity(datagram.encoded_size());
1290 datagram.encode(&mut out).map_err(invalid_data_io_error)?;
1291 let _written = self.socket.send_to(&out, target_addr).await?;
1292 Ok(())
1293 }
1294
1295 async fn send_offline_packet(
1296 &mut self,
1297 addr: SocketAddr,
1298 packet: &OfflinePacket,
1299 ) -> io::Result<()> {
1300 let Some(target_addr) = self.route_outbound_target(addr) else {
1301 return Ok(());
1302 };
1303 let mut out = BytesMut::new();
1304 packet.encode(&mut out).map_err(invalid_data_io_error)?;
1305 let _written = self.socket.send_to(&out, target_addr).await?;
1306 Ok(())
1307 }
1308
1309 async fn send_no_free_incoming_connections(&mut self, addr: SocketAddr) -> io::Result<()> {
1310 let packet = OfflinePacket::NoFreeIncomingConnections(NoFreeIncomingConnections {
1311 server_guid: self.config.server_guid,
1312 magic: self.config.unconnected_magic,
1313 });
1314 self.send_offline_packet(addr, &packet).await
1315 }
1316
1317 async fn send_connection_banned(&mut self, addr: SocketAddr) -> io::Result<()> {
1318 let packet = OfflinePacket::ConnectionBanned(ConnectionBanned {
1319 server_guid: self.config.server_guid,
1320 magic: self.config.unconnected_magic,
1321 });
1322 self.send_offline_packet(addr, &packet).await
1323 }
1324
1325 async fn send_already_connected(&mut self, addr: SocketAddr) -> io::Result<()> {
1326 let packet = OfflinePacket::AlreadyConnected(AlreadyConnected {
1327 server_guid: self.config.server_guid,
1328 magic: self.config.unconnected_magic,
1329 });
1330 self.send_offline_packet(addr, &packet).await
1331 }
1332
1333 async fn send_ip_recently_connected(&mut self, addr: SocketAddr) -> io::Result<()> {
1334 let packet = OfflinePacket::IpRecentlyConnected(IpRecentlyConnected {
1335 server_guid: self.config.server_guid,
1336 magic: self.config.unconnected_magic,
1337 });
1338 self.send_offline_packet(addr, &packet).await
1339 }
1340
1341 fn route_outbound_target(&mut self, addr: SocketAddr) -> Option<SocketAddr> {
1342 let Some(router) = self.proxy_router.as_ref() else {
1343 return Some(addr);
1344 };
1345
1346 let local_addr = self.local_addr().unwrap_or(self.config.bind_addr);
1347 match router.route_outbound(addr, local_addr) {
1348 OutboundProxyRoute::Send { target_addr } => {
1349 if target_addr != addr {
1350 self.proxy_outbound_reroutes = self.proxy_outbound_reroutes.saturating_add(1);
1351 }
1352 Some(target_addr)
1353 }
1354 OutboundProxyRoute::Drop => {
1355 self.proxy_outbound_drops = self.proxy_outbound_drops.saturating_add(1);
1356 None
1357 }
1358 }
1359 }
1360
1361 async fn handle_offline_packet(
1362 &mut self,
1363 addr: SocketAddr,
1364 packet: &OfflinePacket,
1365 now: Instant,
1366 ) -> io::Result<Option<TransportEvent>> {
1367 match packet {
1368 OfflinePacket::UnconnectedPing(ping)
1369 | OfflinePacket::UnconnectedPingOpenConnections(ping) => {
1370 let pong = OfflinePacket::UnconnectedPong(UnconnectedPong {
1371 ping_time: ping.ping_time,
1372 server_guid: self.config.server_guid,
1373 magic: self.config.unconnected_magic,
1374 motd: self.advertisement_bytes.clone(),
1375 });
1376 self.send_offline_packet(addr, &pong).await?;
1377 }
1378 OfflinePacket::OpenConnectionRequest1(req1) => {
1379 if !supports_protocol(&self.config.supported_protocols, req1.protocol_version) {
1380 warn!(
1381 %addr,
1382 protocol_version = req1.protocol_version,
1383 "rejecting request1: incompatible protocol version"
1384 );
1385 let incompatible =
1386 OfflinePacket::IncompatibleProtocolVersion(IncompatibleProtocolVersion {
1387 protocol_version: primary_protocol_version(
1388 &self.config.supported_protocols,
1389 ),
1390 server_guid: self.config.server_guid,
1391 magic: self.config.unconnected_magic,
1392 });
1393 self.send_offline_packet(addr, &incompatible).await?;
1394 return Ok(None);
1395 }
1396
1397 if self.is_ip_recently_connected(addr, now) {
1398 self.handshake_ip_recently_connected_rejects = self
1399 .handshake_ip_recently_connected_rejects
1400 .saturating_add(1);
1401 warn!(%addr, "rejecting request1: ip recently connected");
1402 self.send_ip_recently_connected(addr).await?;
1403 return Ok(None);
1404 }
1405
1406 if self.has_active_session_for_offline_reject(addr) {
1407 self.handshake_already_connected_rejects =
1408 self.handshake_already_connected_rejects.saturating_add(1);
1409 warn!(%addr, "rejecting request1: already connected");
1410 self.send_already_connected(addr).await?;
1411 return Ok(None);
1412 }
1413
1414 if self.pending_handshakes.get(&addr).is_some_and(|pending| {
1415 pending.stage == PendingHandshakeStage::AwaitingConnectionRequest
1416 }) {
1417 self.handshake_already_connected_rejects =
1418 self.handshake_already_connected_rejects.saturating_add(1);
1419 warn!(%addr, "rejecting request1: handshake already in progress");
1420 self.send_already_connected(addr).await?;
1421 return Ok(None);
1422 }
1423
1424 if self.would_exceed_session_limit(addr) {
1425 warn!(%addr, "rejecting request1: session limit reached");
1426 self.send_no_free_incoming_connections(addr).await?;
1427 return Ok(Some(TransportEvent::SessionLimitReached { addr }));
1428 }
1429
1430 let mtu = self.negotiate_mtu(req1.mtu);
1431 let previous_cookie = self.pending_handshakes.get(&addr).and_then(|pending| {
1432 if pending.stage == PendingHandshakeStage::AwaitingRequest2 {
1433 pending.cookie
1434 } else {
1435 None
1436 }
1437 });
1438 let cookie = if self.config.send_cookie {
1439 previous_cookie.or_else(|| Some(self.generate_cookie(addr)))
1440 } else {
1441 None
1442 };
1443
1444 self.pending_handshakes.insert(
1445 addr,
1446 PendingHandshake {
1447 mtu,
1448 cookie,
1449 client_guid: None,
1450 stage: PendingHandshakeStage::AwaitingRequest2,
1451 expires_at: now + self.config.handshake_req1_req2_timeout(),
1452 },
1453 );
1454
1455 self.sessions.remove(&addr);
1456 self.session_pipelines.remove(&addr);
1457 self.sessions.insert(
1458 addr,
1459 Session::with_tunables(mtu as usize, self.config.session_tunables.clone()),
1460 );
1461 let mut valid_transition = true;
1462 if let Some(session) = self.sessions.get_mut(&addr) {
1463 valid_transition = Self::apply_session_transitions(
1464 session,
1465 &[SessionState::Req1Recv, SessionState::Reply1Sent],
1466 );
1467 }
1468 if !valid_transition {
1469 self.record_illegal_state_transition(addr);
1470 return Ok(None);
1471 }
1472
1473 let reply = OfflinePacket::OpenConnectionReply1(OpenConnectionReply1 {
1474 server_guid: self.config.server_guid,
1475 mtu,
1476 cookie,
1477 magic: self.config.unconnected_magic,
1478 });
1479 self.send_offline_packet(addr, &reply).await?;
1480 }
1481 OfflinePacket::OpenConnectionRequest2(req2) => {
1482 if self.is_ip_recently_connected(addr, now) {
1483 self.handshake_ip_recently_connected_rejects = self
1484 .handshake_ip_recently_connected_rejects
1485 .saturating_add(1);
1486 warn!(%addr, "rejecting request2: ip recently connected");
1487 self.send_ip_recently_connected(addr).await?;
1488 return Ok(None);
1489 }
1490
1491 if self.has_active_session_for_offline_reject(addr) {
1492 self.handshake_already_connected_rejects =
1493 self.handshake_already_connected_rejects.saturating_add(1);
1494 warn!(%addr, "rejecting request2: already connected");
1495 self.send_already_connected(addr).await?;
1496 return Ok(None);
1497 }
1498
1499 match req2.parse_path {
1500 Request2ParsePath::LegacyHeuristic => {
1501 self.request2_legacy_parse_hits =
1502 self.request2_legacy_parse_hits.saturating_add(1);
1503 if !self.config.allow_legacy_request2_fallback {
1504 self.request2_legacy_drops =
1505 self.request2_legacy_drops.saturating_add(1);
1506 self.handshake_stage_cancel_drops =
1507 self.handshake_stage_cancel_drops.saturating_add(1);
1508 warn!(%addr, "dropping request2: legacy parse path disallowed");
1509 let newly_blocked = self.record_handshake_violation(
1510 addr,
1511 HandshakeViolation::ParseAnomalyDrop,
1512 now,
1513 );
1514 if newly_blocked {
1515 self.send_connection_banned(addr).await?;
1516 }
1517 return Ok(None);
1518 }
1519 }
1520 Request2ParsePath::AmbiguousPreferredNoCookie
1521 | Request2ParsePath::AmbiguousPreferredWithCookie => {
1522 self.request2_ambiguous_parse_hits =
1523 self.request2_ambiguous_parse_hits.saturating_add(1);
1524 if self.config.reject_ambiguous_request2 {
1525 self.request2_ambiguous_drops =
1526 self.request2_ambiguous_drops.saturating_add(1);
1527 self.handshake_stage_cancel_drops =
1528 self.handshake_stage_cancel_drops.saturating_add(1);
1529 warn!(%addr, "dropping request2: ambiguous parse path rejected");
1530 let newly_blocked = self.record_handshake_violation(
1531 addr,
1532 HandshakeViolation::ParseAnomalyDrop,
1533 now,
1534 );
1535 if newly_blocked {
1536 self.send_connection_banned(addr).await?;
1537 }
1538 return Ok(None);
1539 }
1540 }
1541 Request2ParsePath::StrictNoCookie | Request2ParsePath::StrictWithCookie => {}
1542 }
1543
1544 let local_server_addr = self.local_addr().unwrap_or(self.config.bind_addr);
1545 if !self.is_request2_server_addr_allowed(req2.server_addr, local_server_addr) {
1546 self.request2_server_addr_mismatch_drops =
1547 self.request2_server_addr_mismatch_drops.saturating_add(1);
1548 self.handshake_stage_cancel_drops =
1549 self.handshake_stage_cancel_drops.saturating_add(1);
1550 warn!(
1551 %addr,
1552 request_server_addr = %req2.server_addr,
1553 local_server_addr = %local_server_addr,
1554 "dropping request2: server_addr mismatch"
1555 );
1556 return Ok(None);
1557 }
1558
1559 let Some(pending) = self.pending_handshakes.get(&addr).copied() else {
1560 self.handshake_missing_req1_drops =
1561 self.handshake_missing_req1_drops.saturating_add(1);
1562 self.handshake_stage_cancel_drops =
1563 self.handshake_stage_cancel_drops.saturating_add(1);
1564 warn!(%addr, "dropping request2: missing pending request1");
1565 let newly_blocked = self.record_handshake_violation(
1566 addr,
1567 HandshakeViolation::MissingPendingReq1,
1568 now,
1569 );
1570 if newly_blocked {
1571 self.send_connection_banned(addr).await?;
1572 }
1573 return Ok(None);
1574 };
1575
1576 if pending.expires_at <= now {
1577 match pending.stage {
1578 PendingHandshakeStage::AwaitingRequest2 => {
1579 self.handshake_req1_req2_timeouts =
1580 self.handshake_req1_req2_timeouts.saturating_add(1);
1581 }
1582 PendingHandshakeStage::AwaitingConnectionRequest => {
1583 self.handshake_reply2_connect_timeouts =
1584 self.handshake_reply2_connect_timeouts.saturating_add(1);
1585 }
1586 }
1587 self.handshake_stage_cancel_drops =
1588 self.handshake_stage_cancel_drops.saturating_add(1);
1589 warn!(
1590 %addr,
1591 stage = ?pending.stage,
1592 "dropping request2: pending handshake timed out"
1593 );
1594 let violation = match pending.stage {
1595 PendingHandshakeStage::AwaitingRequest2 => {
1596 HandshakeViolation::Req1Req2Timeout
1597 }
1598 PendingHandshakeStage::AwaitingConnectionRequest => {
1599 HandshakeViolation::Reply2ConnectTimeout
1600 }
1601 };
1602 let newly_blocked = self.record_handshake_violation(addr, violation, now);
1603 self.close_session(addr);
1604 if newly_blocked {
1605 self.send_connection_banned(addr).await?;
1606 }
1607 return Ok(None);
1608 }
1609
1610 if self.config.send_cookie
1611 && (pending.cookie.is_none() || !self.verify_cookie(addr, req2.cookie))
1612 {
1613 self.cookie_mismatch_drops = self.cookie_mismatch_drops.saturating_add(1);
1614 self.handshake_stage_cancel_drops =
1615 self.handshake_stage_cancel_drops.saturating_add(1);
1616 warn!(%addr, "dropping request2: cookie verification failed");
1617 let blocked_by_guard = self.record_cookie_mismatch(addr.ip(), now);
1618 let blocked_by_heuristics = self.record_handshake_violation(
1619 addr,
1620 HandshakeViolation::CookieMismatch,
1621 now,
1622 );
1623 if blocked_by_guard || blocked_by_heuristics {
1624 self.send_connection_banned(addr).await?;
1625 }
1626 return Ok(None);
1627 }
1628 self.clear_cookie_mismatch_state(addr.ip());
1629
1630 if pending.stage == PendingHandshakeStage::AwaitingConnectionRequest {
1631 let retry_mtu = self.negotiate_mtu(req2.mtu.min(pending.mtu));
1632 let Some(session) = self.sessions.get_mut(&addr) else {
1633 self.record_illegal_state_transition(addr);
1634 return Ok(None);
1635 };
1636 if !is_post_reply2_handshake_state(session.state()) {
1637 self.record_illegal_state_transition(addr);
1638 return Ok(None);
1639 }
1640 session.set_mtu(retry_mtu as usize);
1641 session.touch_activity(now);
1642
1643 let reply = OfflinePacket::OpenConnectionReply2(OpenConnectionReply2 {
1644 server_guid: self.config.server_guid,
1645 server_addr: local_server_addr,
1646 mtu: retry_mtu,
1647 use_encryption: false,
1648 magic: self.config.unconnected_magic,
1649 });
1650 self.send_offline_packet(addr, &reply).await?;
1651 return Ok(None);
1652 }
1653
1654 if self.would_exceed_session_limit(addr) {
1655 warn!(%addr, "rejecting request2: session limit reached");
1656 self.send_no_free_incoming_connections(addr).await?;
1657 return Ok(Some(TransportEvent::SessionLimitReached { addr }));
1658 }
1659
1660 let mtu = self.negotiate_mtu(req2.mtu.min(pending.mtu));
1661 let Some(session) = self.sessions.get_mut(&addr) else {
1662 self.record_illegal_state_transition(addr);
1663 return Ok(None);
1664 };
1665 session.set_mtu(mtu as usize);
1666 session.touch_activity(now);
1667 if !Self::apply_session_transitions(
1668 session,
1669 &[SessionState::Req2Recv, SessionState::Reply2Sent],
1670 ) {
1671 self.record_illegal_state_transition(addr);
1672 return Ok(None);
1673 }
1674 self.pending_handshakes.insert(
1675 addr,
1676 PendingHandshake {
1677 mtu,
1678 cookie: pending.cookie,
1679 client_guid: Some(req2.client_guid),
1680 stage: PendingHandshakeStage::AwaitingConnectionRequest,
1681 expires_at: now + self.config.handshake_reply2_connect_timeout(),
1682 },
1683 );
1684 let reply = OfflinePacket::OpenConnectionReply2(OpenConnectionReply2 {
1685 server_guid: self.config.server_guid,
1686 server_addr: local_server_addr,
1687 mtu,
1688 use_encryption: false,
1689 magic: self.config.unconnected_magic,
1690 });
1691 self.send_offline_packet(addr, &reply).await?;
1692 }
1693 _ => {}
1694 }
1695
1696 Ok(None)
1697 }
1698
1699 fn apply_connected_control(
1700 session: &mut Session,
1701 addr: SocketAddr,
1702 server_addr: SocketAddr,
1703 now_millis: i64,
1704 packet: ConnectedControlPacket,
1705 ) -> Result<ControlAction, DecodeError> {
1706 match packet {
1707 ConnectedControlPacket::ConnectedPing(ping) => {
1708 if session.state() != SessionState::Connected {
1709 return Ok(ControlAction::CloseSession {
1710 remote_reason: None,
1711 illegal_state: true,
1712 });
1713 }
1714 let pong = ConnectedControlPacket::ConnectedPong(ConnectedPong {
1715 ping_time: ping.ping_time,
1716 pong_time: now_millis,
1717 });
1718 let queued = Self::queue_connected_control_packet(
1719 session,
1720 pong,
1721 Reliability::Unreliable,
1722 0,
1723 RakPriority::Immediate,
1724 )?;
1725 if matches!(queued, QueuePayloadResult::DisconnectRequested)
1726 || session.take_backpressure_disconnect()
1727 {
1728 return Ok(ControlAction::CloseSession {
1729 remote_reason: None,
1730 illegal_state: false,
1731 });
1732 }
1733 }
1734 ConnectedControlPacket::ConnectionRequest(request) => {
1735 if !Self::apply_session_transitions(session, &[SessionState::ConnReqRecv]) {
1736 return Ok(ControlAction::CloseSession {
1737 remote_reason: None,
1738 illegal_state: true,
1739 });
1740 }
1741
1742 let accepted =
1743 ConnectedControlPacket::ConnectionRequestAccepted(ConnectionRequestAccepted {
1744 client_addr: addr,
1745 system_index: 0,
1746 internal_addrs: build_internal_addrs(server_addr),
1747 request_time: request.request_time,
1748 accepted_time: now_millis,
1749 });
1750
1751 let queued = Self::queue_connected_control_packet(
1752 session,
1753 accepted,
1754 Reliability::ReliableOrdered,
1755 0,
1756 RakPriority::High,
1757 )?;
1758 if !matches!(queued, QueuePayloadResult::Enqueued { .. }) {
1759 return Ok(ControlAction::CloseSession {
1760 remote_reason: None,
1761 illegal_state: false,
1762 });
1763 }
1764 if session.take_backpressure_disconnect() {
1765 return Ok(ControlAction::CloseSession {
1766 remote_reason: None,
1767 illegal_state: false,
1768 });
1769 }
1770
1771 if !Self::apply_session_transitions(session, &[SessionState::ConnReqAcceptedSent]) {
1772 return Ok(ControlAction::CloseSession {
1773 remote_reason: None,
1774 illegal_state: true,
1775 });
1776 }
1777 }
1778 ConnectedControlPacket::NewIncomingConnection(_) => {
1779 if !Self::apply_session_transitions(
1780 session,
1781 &[SessionState::NewIncomingRecv, SessionState::Connected],
1782 ) {
1783 return Ok(ControlAction::CloseSession {
1784 remote_reason: None,
1785 illegal_state: true,
1786 });
1787 }
1788 }
1789 ConnectedControlPacket::DisconnectionNotification(pkt) => {
1790 if !Self::apply_session_transitions(
1791 session,
1792 &[SessionState::Closing, SessionState::Closed],
1793 ) {
1794 return Ok(ControlAction::CloseSession {
1795 remote_reason: None,
1796 illegal_state: true,
1797 });
1798 }
1799 return Ok(ControlAction::CloseSession {
1800 remote_reason: Some(RemoteDisconnectReason::DisconnectionNotification {
1801 reason_code: pkt.reason,
1802 }),
1803 illegal_state: false,
1804 });
1805 }
1806 ConnectedControlPacket::DetectLostConnection(_) => {
1807 if !Self::apply_session_transitions(
1808 session,
1809 &[SessionState::Closing, SessionState::Closed],
1810 ) {
1811 return Ok(ControlAction::CloseSession {
1812 remote_reason: None,
1813 illegal_state: true,
1814 });
1815 }
1816 return Ok(ControlAction::CloseSession {
1817 remote_reason: Some(RemoteDisconnectReason::DetectLostConnection),
1818 illegal_state: false,
1819 });
1820 }
1821 ConnectedControlPacket::ConnectedPong(_)
1822 | ConnectedControlPacket::ConnectionRequestAccepted(_) => {}
1823 }
1824
1825 Ok(ControlAction::None)
1826 }
1827
1828 fn apply_session_transitions(session: &mut Session, transitions: &[SessionState]) -> bool {
1829 for &next in transitions {
1830 if !session.transition_to(next) {
1831 return false;
1832 }
1833 }
1834 true
1835 }
1836
1837 fn queue_connected_control_packet(
1838 session: &mut Session,
1839 packet: ConnectedControlPacket,
1840 reliability: Reliability,
1841 channel: u8,
1842 priority: RakPriority,
1843 ) -> Result<QueuePayloadResult, DecodeError> {
1844 let mut bytes = BytesMut::new();
1845 packet
1846 .encode(&mut bytes)
1847 .map_err(|_| DecodeError::UnexpectedEof)?;
1848 let payload = bytes.freeze();
1849 Ok(session.queue_payload(payload, reliability, channel, priority))
1850 }
1851
1852 fn prune_pending_handshakes(&mut self, now: Instant) {
1853 let mut expired = Vec::new();
1854 for (addr, pending) in &self.pending_handshakes {
1855 if pending.expires_at <= now {
1856 expired.push((*addr, *pending));
1857 }
1858 }
1859
1860 for (addr, pending) in expired {
1861 let violation = match pending.stage {
1862 PendingHandshakeStage::AwaitingRequest2 => {
1863 self.handshake_req1_req2_timeouts =
1864 self.handshake_req1_req2_timeouts.saturating_add(1);
1865 HandshakeViolation::Req1Req2Timeout
1866 }
1867 PendingHandshakeStage::AwaitingConnectionRequest => {
1868 self.handshake_reply2_connect_timeouts =
1869 self.handshake_reply2_connect_timeouts.saturating_add(1);
1870 HandshakeViolation::Reply2ConnectTimeout
1871 }
1872 };
1873 warn!(
1874 %addr,
1875 stage = ?pending.stage,
1876 "pending handshake expired and session is being closed"
1877 );
1878 self.handshake_stage_cancel_drops = self.handshake_stage_cancel_drops.saturating_add(1);
1879 let _ = self.record_handshake_violation(addr, violation, now);
1880 self.close_session(addr);
1881 }
1882
1883 self.prune_handshake_heuristic_states(now);
1884 }
1885
1886 fn clear_cookie_mismatch_state(&mut self, ip: IpAddr) {
1887 self.cookie_mismatch_guard_states.remove(&ip);
1888 }
1889
1890 fn record_cookie_mismatch(&mut self, ip: IpAddr, now: Instant) -> bool {
1891 let guard = self.config.cookie_mismatch_guard;
1892 if !guard.enabled
1893 || guard.event_window.is_zero()
1894 || guard.block_duration.is_zero()
1895 || guard.mismatch_threshold == 0
1896 {
1897 return false;
1898 }
1899
1900 let state =
1901 self.cookie_mismatch_guard_states
1902 .entry(ip)
1903 .or_insert(CookieMismatchGuardState {
1904 window_started_at: now,
1905 mismatches: 0,
1906 });
1907
1908 if now.saturating_duration_since(state.window_started_at) > guard.event_window {
1909 state.window_started_at = now;
1910 state.mismatches = 0;
1911 }
1912
1913 state.mismatches = state.mismatches.saturating_add(1);
1914 if state.mismatches < guard.mismatch_threshold {
1915 return false;
1916 }
1917
1918 state.window_started_at = now;
1919 state.mismatches = 0;
1920
1921 let newly_blocked = self.rate_limiter.block_address_for_with_reason(
1922 ip,
1923 now,
1924 guard.block_duration,
1925 BlockReason::CookieMismatchGuard,
1926 );
1927 if newly_blocked {
1928 self.cookie_mismatch_blocks = self.cookie_mismatch_blocks.saturating_add(1);
1929 warn!(
1930 %ip,
1931 mismatches_required = guard.mismatch_threshold,
1932 block_secs = guard.block_duration.as_secs(),
1933 "cookie mismatch guard blocked address"
1934 );
1935 }
1936 newly_blocked
1937 }
1938
1939 fn prune_cookie_mismatch_guard_states(&mut self, now: Instant) {
1940 let guard = self.config.cookie_mismatch_guard;
1941 if !guard.enabled || guard.event_window.is_zero() {
1942 self.cookie_mismatch_guard_states.clear();
1943 return;
1944 }
1945
1946 self.cookie_mismatch_guard_states.retain(|_, state| {
1947 now.saturating_duration_since(state.window_started_at) <= guard.event_window
1948 });
1949 }
1950
1951 fn record_handshake_violation(
1952 &mut self,
1953 addr: SocketAddr,
1954 violation: HandshakeViolation,
1955 now: Instant,
1956 ) -> bool {
1957 let h = self.config.handshake_heuristics;
1958 if !h.enabled
1959 || h.score_threshold == 0
1960 || h.event_window.is_zero()
1961 || h.block_duration.is_zero()
1962 {
1963 return false;
1964 }
1965
1966 let points = violation.score(&self.config);
1967 if points == 0 {
1968 return false;
1969 }
1970 debug!(%addr, ?violation, points, "recorded handshake violation");
1971
1972 let mut should_block = false;
1973 {
1974 let entry =
1975 self.handshake_heuristics
1976 .entry(addr.ip())
1977 .or_insert(HandshakeHeuristicState {
1978 window_started_at: now,
1979 score: 0,
1980 });
1981
1982 if now.saturating_duration_since(entry.window_started_at) > h.event_window {
1983 entry.window_started_at = now;
1984 entry.score = 0;
1985 }
1986
1987 entry.score = entry.score.saturating_add(points);
1988 if entry.score >= h.score_threshold {
1989 entry.window_started_at = now;
1990 entry.score = 0;
1991 should_block = true;
1992 }
1993 }
1994
1995 if !should_block {
1996 return false;
1997 }
1998
1999 let newly_blocked = self.rate_limiter.block_address_for_with_reason(
2000 addr.ip(),
2001 now,
2002 h.block_duration,
2003 BlockReason::HandshakeHeuristic,
2004 );
2005 if newly_blocked {
2006 self.handshake_auto_blocks = self.handshake_auto_blocks.saturating_add(1);
2007 warn!(
2008 %addr,
2009 ?violation,
2010 block_secs = h.block_duration.as_secs(),
2011 "handshake heuristic blocked address"
2012 );
2013 }
2014 newly_blocked
2015 }
2016
2017 fn prune_handshake_heuristic_states(&mut self, now: Instant) {
2018 let h = self.config.handshake_heuristics;
2019 if !h.enabled || h.event_window.is_zero() {
2020 self.handshake_heuristics.clear();
2021 return;
2022 }
2023
2024 self.handshake_heuristics.retain(|_, state| {
2025 now.saturating_duration_since(state.window_started_at) <= h.event_window
2026 });
2027 }
2028
2029 fn close_session(&mut self, addr: SocketAddr) {
2030 self.pending_handshakes.remove(&addr);
2031 if let Some(session) = self.sessions.remove(&addr) {
2032 let state = session.state();
2033 if should_count_closed_session(state) {
2034 self.sessions_closed_total = self.sessions_closed_total.saturating_add(1);
2035 }
2036 if should_mark_ip_recently_connected(state) {
2037 self.mark_ip_recently_connected(addr, Instant::now());
2038 }
2039 }
2040 self.session_pipelines.remove(&addr);
2041 }
2042
2043 fn has_active_session_for_offline_reject(&self, addr: SocketAddr) -> bool {
2044 let Some(session) = self.sessions.get(&addr) else {
2045 return false;
2046 };
2047 if self.pending_handshakes.contains_key(&addr) {
2048 return false;
2049 }
2050 matches!(
2051 session.state(),
2052 SessionState::Req2Recv
2053 | SessionState::Reply2Sent
2054 | SessionState::ConnReqRecv
2055 | SessionState::ConnReqAcceptedSent
2056 | SessionState::NewIncomingRecv
2057 | SessionState::Connected
2058 | SessionState::Closing
2059 | SessionState::Closed
2060 )
2061 }
2062
2063 fn is_request2_server_addr_allowed(
2064 &self,
2065 request_server_addr: SocketAddr,
2066 local_server_addr: SocketAddr,
2067 ) -> bool {
2068 match self.config.request2_server_addr_policy {
2069 Request2ServerAddrPolicy::Disabled => true,
2070 Request2ServerAddrPolicy::PortOnly => {
2071 request_server_addr.port() == local_server_addr.port()
2072 }
2073 Request2ServerAddrPolicy::Exact => {
2074 if local_server_addr.ip().is_unspecified() {
2075 request_server_addr.port() == local_server_addr.port()
2076 } else {
2077 request_server_addr == local_server_addr
2078 }
2079 }
2080 }
2081 }
2082
2083 fn is_ip_recently_connected(&mut self, addr: SocketAddr, now: Instant) -> bool {
2084 self.prune_ip_recently_connected(now);
2085 self.ip_recently_connected_until
2086 .get(&addr.ip())
2087 .is_some_and(|until| *until > now)
2088 }
2089
2090 fn mark_ip_recently_connected(&mut self, addr: SocketAddr, now: Instant) {
2091 let window = self.config.ip_recently_connected_window;
2092 if window.is_zero() {
2093 return;
2094 }
2095 self.ip_recently_connected_until
2096 .insert(addr.ip(), now + window);
2097 }
2098
2099 fn prune_ip_recently_connected(&mut self, now: Instant) {
2100 if self.config.ip_recently_connected_window.is_zero() {
2101 self.ip_recently_connected_until.clear();
2102 return;
2103 }
2104 self.ip_recently_connected_until
2105 .retain(|_, until| *until > now);
2106 }
2107
2108 fn record_remote_disconnect(&mut self, reason: RemoteDisconnectReason) {
2109 match reason {
2110 RemoteDisconnectReason::DisconnectionNotification { .. } => {
2111 self.remote_disconnect_notifications =
2112 self.remote_disconnect_notifications.saturating_add(1);
2113 }
2114 RemoteDisconnectReason::DetectLostConnection => {
2115 self.remote_detect_lost_disconnects =
2116 self.remote_detect_lost_disconnects.saturating_add(1);
2117 }
2118 }
2119 }
2120
2121 fn prune_idle_sessions(&mut self, now: Instant, protected_addr: Option<SocketAddr>) {
2122 let timeout = self.config.session_idle_timeout;
2123 if timeout.is_zero() {
2124 return;
2125 }
2126
2127 let mut stale = Vec::new();
2128 for (addr, session) in &self.sessions {
2129 if Some(*addr) == protected_addr {
2130 continue;
2131 }
2132 if session.state() != SessionState::Connected {
2133 continue;
2134 }
2135 if session.idle_for(now) >= timeout {
2136 stale.push(*addr);
2137 }
2138 }
2139
2140 for addr in stale {
2141 self.timed_out_sessions = self.timed_out_sessions.saturating_add(1);
2142 self.close_session(addr);
2143 }
2144 }
2145
2146 fn queue_keepalive_pings(&mut self, now: Instant) {
2147 let interval = self.config.session_keepalive_interval;
2148 if interval.is_zero() {
2149 return;
2150 }
2151
2152 let mut close_addrs = Vec::new();
2153 for (addr, session) in &mut self.sessions {
2154 if !session.should_send_keepalive(now, interval) {
2155 continue;
2156 }
2157
2158 let ping = ConnectedControlPacket::ConnectedPing(ConnectedPing {
2159 ping_time: unix_timestamp_millis(),
2160 });
2161
2162 if matches!(
2163 Self::queue_connected_control_packet(
2164 session,
2165 ping,
2166 Reliability::Unreliable,
2167 0,
2168 RakPriority::Low,
2169 ),
2170 Ok(QueuePayloadResult::Enqueued { .. })
2171 ) {
2172 session.mark_keepalive_sent(now);
2173 self.keepalive_pings_sent = self.keepalive_pings_sent.saturating_add(1);
2174 }
2175
2176 if session.take_backpressure_disconnect() {
2177 close_addrs.push(*addr);
2178 }
2179 }
2180
2181 for addr in close_addrs {
2182 self.close_session(addr);
2183 }
2184 }
2185
2186 fn record_illegal_state_transition(&mut self, addr: SocketAddr) {
2187 self.illegal_state_transitions = self.illegal_state_transitions.saturating_add(1);
2188 self.close_session(addr);
2189 }
2190
2191 fn would_exceed_session_limit(&self, addr: SocketAddr) -> bool {
2192 !self.sessions.contains_key(&addr) && self.sessions.len() >= self.config.max_sessions
2193 }
2194
2195 fn negotiate_mtu(&self, requested: u16) -> u16 {
2196 let server_cap =
2197 self.config
2198 .mtu
2199 .clamp(MINIMUM_MTU_SIZE as usize, MAXIMUM_MTU_SIZE as usize) as u16;
2200 requested.clamp(MINIMUM_MTU_SIZE, server_cap)
2201 }
2202
2203 fn rotate_cookie_keys_if_needed(&mut self, now: Instant) {
2204 if !self.config.send_cookie || self.config.cookie_rotation_interval.is_zero() {
2205 return;
2206 }
2207 if now < self.next_cookie_rotation {
2208 return;
2209 }
2210
2211 let next_key = SecretCookieKey::new(random_cookie_key());
2212 let previous_key = std::mem::replace(&mut self.cookie_key_current, next_key);
2213 self.cookie_key_previous = Some(previous_key);
2214 self.next_cookie_rotation = now + self.config.cookie_rotation_interval;
2215 self.cookie_rotations = self.cookie_rotations.saturating_add(1);
2216 }
2217
2218 fn generate_cookie(&self, addr: SocketAddr) -> u32 {
2219 self.compute_cookie_for_key(addr, self.cookie_key_current.as_ref())
2220 }
2221
2222 fn verify_cookie(&self, addr: SocketAddr, cookie: Option<u32>) -> bool {
2223 let Some(cookie) = cookie else {
2224 return false;
2225 };
2226
2227 if self.compute_cookie_for_key(addr, self.cookie_key_current.as_ref()) == cookie {
2228 return true;
2229 }
2230
2231 if let Some(previous_key) = self.cookie_key_previous.as_ref()
2232 && self.compute_cookie_for_key(addr, previous_key.as_ref()) == cookie
2233 {
2234 return true;
2235 }
2236
2237 false
2238 }
2239
2240 fn compute_cookie_for_key(&self, addr: SocketAddr, key: &[u8]) -> u32 {
2241 let mut mac = HmacSha256::new_from_slice(key).expect("HMAC supports arbitrary key lengths");
2242 update_mac_with_socket_addr(&mut mac, addr);
2243 update_mac_with_socket_addr(&mut mac, self.config.bind_addr);
2244 mac.update(&self.config.server_guid.to_le_bytes());
2245 let tag = mac.finalize().into_bytes();
2246 u32::from_le_bytes([tag[0], tag[1], tag[2], tag[3]])
2247 }
2248}
2249
2250fn invalid_data_io_error<E: std::fmt::Display>(error: E) -> io::Error {
2251 io::Error::new(io::ErrorKind::InvalidData, error.to_string())
2252}
2253
2254fn supports_protocol(supported: &[u8], incoming: u8) -> bool {
2255 if supported.is_empty() {
2256 return incoming == RAKNET_PROTOCOL_VERSION;
2257 }
2258 supported.contains(&incoming)
2259}
2260
2261fn primary_protocol_version(supported: &[u8]) -> u8 {
2262 supported
2263 .iter()
2264 .copied()
2265 .max()
2266 .unwrap_or(RAKNET_PROTOCOL_VERSION)
2267}
2268
2269fn random_cookie_key() -> [u8; COOKIE_KEY_LEN] {
2270 let mut key = [0u8; COOKIE_KEY_LEN];
2271 if getrandom::getrandom(&mut key).is_ok() {
2272 return key;
2273 }
2274
2275 let fallback = unix_timestamp_millis() as u64;
2276 for (idx, chunk) in key.chunks_exact_mut(8).enumerate() {
2277 let seed = fallback
2278 .wrapping_mul((idx as u64).saturating_add(1))
2279 .rotate_left((idx as u32).saturating_mul(11));
2280 chunk.copy_from_slice(&seed.to_le_bytes());
2281 }
2282 key
2283}
2284
2285fn invalid_config_io_error(error: crate::error::ConfigValidationError) -> io::Error {
2286 io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
2287}
2288
2289#[cfg(any(target_os = "linux", target_os = "android"))]
2290fn set_df_path_mtu_discovery_v4(socket: &Socket) -> io::Result<()> {
2291 let mode: libc::c_int = libc::IP_PMTUDISC_DO;
2292 let rc = unsafe {
2293 libc::setsockopt(
2294 socket.as_raw_fd(),
2295 libc::IPPROTO_IP,
2296 libc::IP_MTU_DISCOVER,
2297 &mode as *const libc::c_int as *const libc::c_void,
2298 mem::size_of::<libc::c_int>() as libc::socklen_t,
2299 )
2300 };
2301 if rc == 0 {
2302 return Ok(());
2303 }
2304 Err(io::Error::last_os_error())
2305}
2306
2307#[cfg(not(any(target_os = "linux", target_os = "android")))]
2308fn set_df_path_mtu_discovery_v4(_socket: &Socket) -> io::Result<()> {
2309 Err(io::Error::new(
2310 io::ErrorKind::Unsupported,
2311 "disable_ip_fragmentation is unsupported for IPv4 on this platform",
2312 ))
2313}
2314
2315#[cfg(any(target_os = "linux", target_os = "android"))]
2316fn set_df_path_mtu_discovery_v6(socket: &Socket) -> io::Result<()> {
2317 let mode: libc::c_int = libc::IPV6_PMTUDISC_DO;
2318 let rc = unsafe {
2319 libc::setsockopt(
2320 socket.as_raw_fd(),
2321 libc::IPPROTO_IPV6,
2322 libc::IPV6_MTU_DISCOVER,
2323 &mode as *const libc::c_int as *const libc::c_void,
2324 mem::size_of::<libc::c_int>() as libc::socklen_t,
2325 )
2326 };
2327 if rc == 0 {
2328 return Ok(());
2329 }
2330 Err(io::Error::last_os_error())
2331}
2332
2333#[cfg(not(any(target_os = "linux", target_os = "android")))]
2334fn set_df_path_mtu_discovery_v6(_socket: &Socket) -> io::Result<()> {
2335 Err(io::Error::new(
2336 io::ErrorKind::Unsupported,
2337 "disable_ip_fragmentation is unsupported for IPv6 on this platform",
2338 ))
2339}
2340
2341fn update_mac_with_socket_addr(mac: &mut HmacSha256, addr: SocketAddr) {
2342 match addr {
2343 SocketAddr::V4(v4) => {
2344 mac.update(&[4]);
2345 mac.update(&v4.ip().octets());
2346 mac.update(&v4.port().to_le_bytes());
2347 }
2348 SocketAddr::V6(v6) => {
2349 mac.update(&[6]);
2350 mac.update(&v6.ip().octets());
2351 mac.update(&v6.port().to_le_bytes());
2352 mac.update(&v6.flowinfo().to_le_bytes());
2353 mac.update(&v6.scope_id().to_le_bytes());
2354 }
2355 }
2356}
2357
2358fn unix_timestamp_millis() -> i64 {
2359 match SystemTime::now().duration_since(UNIX_EPOCH) {
2360 Ok(duration) => duration.as_millis().min(i64::MAX as u128) as i64,
2361 Err(_) => 0,
2362 }
2363}
2364
2365fn build_internal_addrs(server_addr: SocketAddr) -> [SocketAddr; SYSTEM_ADDRESS_COUNT] {
2366 let fallback = match server_addr {
2367 SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
2368 SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
2369 Ipv6Addr::UNSPECIFIED,
2370 0,
2371 0,
2372 v6.scope_id(),
2373 )),
2374 };
2375
2376 let mut addrs = [fallback; SYSTEM_ADDRESS_COUNT];
2377 addrs[0] = server_addr;
2378 addrs
2379}
2380
2381fn estimate_connected_datagram_processing_cost(raw_len: usize, datagram: &Datagram) -> usize {
2382 match &datagram.payload {
2383 DatagramPayload::Ack(payload) | DatagramPayload::Nack(payload) => {
2384 payload.ranges.len().saturating_mul(8).max(1)
2385 }
2386 DatagramPayload::Frames(frames) => {
2387 let mut cost = raw_len.max(1);
2388 cost = cost.saturating_add(frames.len().saturating_mul(48));
2389
2390 for frame in frames {
2391 let payload_len = frame.payload.len();
2392 cost = cost.saturating_add(payload_len);
2393
2394 if frame.header.reliability.is_reliable() {
2395 cost = cost.saturating_add(32);
2396 }
2397 if frame.header.reliability.is_ordered() || frame.header.reliability.is_sequenced()
2398 {
2399 cost = cost.saturating_add(24);
2400 }
2401 if frame.header.is_split {
2402 let split_parts = frame
2403 .split
2404 .as_ref()
2405 .map(|s| s.part_count as usize)
2406 .unwrap_or(1);
2407 cost = cost.saturating_add(512);
2408 cost = cost.saturating_add(payload_len.saturating_mul(2));
2409 cost = cost.saturating_add(split_parts.min(2_048));
2410 }
2411 }
2412
2413 cost.max(1)
2414 }
2415 }
2416}
2417
2418fn is_offline_packet_id(id: u8) -> bool {
2419 matches!(
2420 id,
2421 0x01 | 0x02 | 0x05 | 0x06 | 0x07 | 0x08 | 0x11 | 0x12 | 0x14 | 0x17 | 0x19 | 0x1A | 0x1C
2422 )
2423}
2424
2425fn is_connected_control_id(id: u8) -> bool {
2426 matches!(id, 0x00 | 0x03 | 0x04 | 0x09 | 0x10 | 0x13 | 0x15)
2427}
2428
2429fn is_post_reply2_handshake_state(state: SessionState) -> bool {
2430 matches!(
2431 state,
2432 SessionState::Reply2Sent
2433 | SessionState::ConnReqRecv
2434 | SessionState::ConnReqAcceptedSent
2435 | SessionState::NewIncomingRecv
2436 )
2437}
2438
2439fn should_mark_ip_recently_connected(state: SessionState) -> bool {
2440 state == SessionState::Connected
2441 || state == SessionState::Closing
2442 || state == SessionState::Closed
2443 || is_post_reply2_handshake_state(state)
2444}
2445
2446fn should_count_closed_session(state: SessionState) -> bool {
2447 matches!(
2448 state,
2449 SessionState::Connected | SessionState::Closing | SessionState::Closed
2450 )
2451}
2452
2453#[cfg(test)]
2454mod tests {
2455 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6};
2456 use std::time::{Duration, Instant};
2457
2458 use bytes::{Bytes, BytesMut};
2459 use tokio::runtime::Builder;
2460
2461 use super::{
2462 ConnectedFrameDelivery, ControlAction, HandshakeViolation, PendingHandshake,
2463 PendingHandshakeStage, RemoteDisconnectReason, TransportEvent,
2464 TransportProcessingBudgetConfig, TransportRateLimitConfig, TransportServer,
2465 build_internal_addrs, is_connected_control_id, is_offline_packet_id,
2466 primary_protocol_version, supports_protocol,
2467 };
2468 use crate::error::DecodeError;
2469 use crate::handshake::{
2470 OfflinePacket, OpenConnectionRequest1, OpenConnectionRequest2, Request2ParsePath,
2471 };
2472 use crate::protocol::connected::{
2473 ConnectedControlPacket, ConnectedPing, ConnectionRequest, DetectLostConnection,
2474 DisconnectionNotification,
2475 };
2476 use crate::protocol::constants::{
2477 DEFAULT_UNCONNECTED_MAGIC, DatagramFlags, RAKNET_PROTOCOL_VERSION,
2478 };
2479 use crate::protocol::datagram::{Datagram, DatagramHeader, DatagramPayload};
2480 use crate::protocol::frame::Frame;
2481 use crate::protocol::frame_header::FrameHeader;
2482 use crate::protocol::reliability::Reliability;
2483 use crate::protocol::sequence24::Sequence24;
2484 use crate::session::tunables::SessionTunables;
2485 use crate::session::{QueuePayloadResult, RakPriority, Session, SessionState};
2486 use crate::transport::config::{
2487 CookieMismatchGuardConfig, HandshakeHeuristicsConfig, ProcessingBudgetConfig,
2488 Request2ServerAddrPolicy, TransportConfig, TransportSocketTuning,
2489 };
2490
2491 fn build_test_server(mut config: TransportConfig) -> TransportServer {
2492 let rt = Builder::new_current_thread()
2493 .enable_all()
2494 .build()
2495 .expect("runtime must build");
2496
2497 rt.block_on(async move {
2498 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
2499 .await
2500 .expect("bind should succeed");
2501 config.bind_addr = socket.local_addr().expect("local addr should be available");
2502 TransportServer::with_socket(config, socket).expect("server should build")
2503 })
2504 }
2505
2506 #[test]
2507 fn connected_frame_delivery_preserves_payload_and_metadata() {
2508 let payload = Bytes::from_static(b"\xfepayload");
2509 let frame = Frame {
2510 header: FrameHeader {
2511 reliability: Reliability::ReliableOrdered,
2512 is_split: false,
2513 needs_bas: false,
2514 },
2515 bit_length: (payload.len() as u16) << 3,
2516 reliable_index: Some(Sequence24::new(3)),
2517 sequence_index: None,
2518 ordering_index: Some(Sequence24::new(8)),
2519 ordering_channel: Some(2),
2520 split: None,
2521 payload: payload.clone(),
2522 };
2523
2524 let delivered = ConnectedFrameDelivery::from_frame(frame);
2525 assert_eq!(delivered.payload, payload);
2526 assert_eq!(delivered.reliability, Reliability::ReliableOrdered);
2527 assert_eq!(delivered.reliable_index, Some(Sequence24::new(3)));
2528 assert_eq!(delivered.ordering_index, Some(Sequence24::new(8)));
2529 assert_eq!(delivered.ordering_channel, Some(2));
2530 assert!(delivered.sequence_index.is_none());
2531 }
2532
2533 #[test]
2534 fn apply_session_transitions_accepts_valid_path() {
2535 let mut session = Session::new(1492);
2536 let ok = TransportServer::apply_session_transitions(
2537 &mut session,
2538 &[
2539 SessionState::Req1Recv,
2540 SessionState::Reply1Sent,
2541 SessionState::Req2Recv,
2542 SessionState::Reply2Sent,
2543 SessionState::ConnReqRecv,
2544 SessionState::ConnReqAcceptedSent,
2545 SessionState::NewIncomingRecv,
2546 SessionState::Connected,
2547 ],
2548 );
2549 assert!(ok);
2550 assert_eq!(session.state(), SessionState::Connected);
2551 }
2552
2553 #[test]
2554 fn apply_session_transitions_rejects_invalid_jump() {
2555 let mut session = Session::new(1492);
2556 let ok =
2557 TransportServer::apply_session_transitions(&mut session, &[SessionState::ConnReqRecv]);
2558 assert!(!ok);
2559 assert_eq!(session.state(), SessionState::Offline);
2560 }
2561
2562 #[test]
2563 fn connected_ping_before_connected_state_is_rejected() {
2564 let mut session = Session::new(1492);
2565 let result = TransportServer::apply_connected_control(
2566 &mut session,
2567 "127.0.0.1:19132"
2568 .parse::<SocketAddr>()
2569 .expect("valid socket addr"),
2570 "127.0.0.1:19132"
2571 .parse::<SocketAddr>()
2572 .expect("valid socket addr"),
2573 123,
2574 ConnectedControlPacket::ConnectedPing(ConnectedPing { ping_time: 11 }),
2575 );
2576
2577 assert!(matches!(
2578 result,
2579 Ok(ControlAction::CloseSession {
2580 remote_reason: None,
2581 illegal_state: true,
2582 })
2583 ));
2584 assert_eq!(session.state(), SessionState::Offline);
2585 }
2586
2587 #[test]
2588 fn disconnection_notification_exposes_remote_reason_code() {
2589 let mut session = Session::new(1492);
2590 assert!(TransportServer::apply_session_transitions(
2591 &mut session,
2592 &[
2593 SessionState::Req1Recv,
2594 SessionState::Reply1Sent,
2595 SessionState::Req2Recv,
2596 SessionState::Reply2Sent,
2597 SessionState::ConnReqRecv,
2598 SessionState::ConnReqAcceptedSent,
2599 SessionState::NewIncomingRecv,
2600 SessionState::Connected,
2601 ],
2602 ));
2603
2604 let result = TransportServer::apply_connected_control(
2605 &mut session,
2606 "127.0.0.1:19132"
2607 .parse::<SocketAddr>()
2608 .expect("valid socket addr"),
2609 "127.0.0.1:19132"
2610 .parse::<SocketAddr>()
2611 .expect("valid socket addr"),
2612 123,
2613 ConnectedControlPacket::DisconnectionNotification(DisconnectionNotification {
2614 reason: Some(7),
2615 }),
2616 );
2617
2618 assert!(matches!(
2619 result,
2620 Ok(ControlAction::CloseSession {
2621 remote_reason: Some(RemoteDisconnectReason::DisconnectionNotification {
2622 reason_code: Some(7)
2623 }),
2624 illegal_state: false,
2625 })
2626 ));
2627 assert_eq!(session.state(), SessionState::Closed);
2628 }
2629
2630 #[test]
2631 fn detect_lost_connection_is_reported_as_remote_disconnect() {
2632 let mut session = Session::new(1492);
2633 assert!(TransportServer::apply_session_transitions(
2634 &mut session,
2635 &[
2636 SessionState::Req1Recv,
2637 SessionState::Reply1Sent,
2638 SessionState::Req2Recv,
2639 SessionState::Reply2Sent,
2640 SessionState::ConnReqRecv,
2641 SessionState::ConnReqAcceptedSent,
2642 SessionState::NewIncomingRecv,
2643 SessionState::Connected,
2644 ],
2645 ));
2646
2647 let result = TransportServer::apply_connected_control(
2648 &mut session,
2649 "127.0.0.1:19132"
2650 .parse::<SocketAddr>()
2651 .expect("valid socket addr"),
2652 "127.0.0.1:19132"
2653 .parse::<SocketAddr>()
2654 .expect("valid socket addr"),
2655 123,
2656 ConnectedControlPacket::DetectLostConnection(DetectLostConnection),
2657 );
2658
2659 assert!(matches!(
2660 result,
2661 Ok(ControlAction::CloseSession {
2662 remote_reason: Some(RemoteDisconnectReason::DetectLostConnection),
2663 illegal_state: false,
2664 })
2665 ));
2666 assert_eq!(session.state(), SessionState::Closed);
2667 }
2668
2669 #[test]
2670 fn connection_request_after_reply2_is_accepted() {
2671 let mut session = Session::new(1492);
2672 assert!(TransportServer::apply_session_transitions(
2673 &mut session,
2674 &[
2675 SessionState::Req1Recv,
2676 SessionState::Reply1Sent,
2677 SessionState::Req2Recv,
2678 SessionState::Reply2Sent,
2679 ],
2680 ));
2681
2682 let result = TransportServer::apply_connected_control(
2683 &mut session,
2684 "127.0.0.1:19132"
2685 .parse::<SocketAddr>()
2686 .expect("valid socket addr"),
2687 "127.0.0.1:19132"
2688 .parse::<SocketAddr>()
2689 .expect("valid socket addr"),
2690 123,
2691 ConnectedControlPacket::ConnectionRequest(ConnectionRequest {
2692 client_guid: 42,
2693 request_time: 77,
2694 use_encryption: false,
2695 }),
2696 );
2697
2698 assert!(matches!(result, Ok(ControlAction::None)));
2699 assert_eq!(session.state(), SessionState::ConnReqAcceptedSent);
2700 }
2701
2702 #[test]
2703 fn protocol_support_accepts_configured_versions() {
2704 let versions = [10, 11, 12];
2705 assert!(supports_protocol(&versions, 10));
2706 assert!(supports_protocol(&versions, 12));
2707 assert!(!supports_protocol(&versions, 9));
2708 assert!(supports_protocol(&[], RAKNET_PROTOCOL_VERSION));
2709 assert!(!supports_protocol(
2710 &[],
2711 RAKNET_PROTOCOL_VERSION.saturating_sub(1)
2712 ));
2713 }
2714
2715 #[test]
2716 fn primary_protocol_version_uses_highest_configured_version() {
2717 let versions = [11, 13, 12];
2718 assert_eq!(primary_protocol_version(&versions), 13);
2719 assert_eq!(primary_protocol_version(&[]), RAKNET_PROTOCOL_VERSION);
2720 }
2721
2722 #[test]
2723 fn cookie_rotation_keeps_previous_key_valid_temporarily() {
2724 let config = TransportConfig {
2725 send_cookie: true,
2726 cookie_rotation_interval: Duration::from_secs(1),
2727 ..TransportConfig::default()
2728 };
2729 let mut server = build_test_server(config);
2730 let addr = "127.0.0.1:19132"
2731 .parse::<SocketAddr>()
2732 .expect("valid socket addr");
2733
2734 let mut original_key = [0u8; super::COOKIE_KEY_LEN];
2735 original_key.copy_from_slice(server.cookie_key_current.as_ref());
2736 let old_cookie = server.generate_cookie(addr);
2737 server.rotate_cookie_keys_if_needed(Instant::now() + Duration::from_secs(2));
2738
2739 assert_eq!(server.cookie_rotations, 1);
2740 let previous_key = server.cookie_key_previous.as_ref().map(|key| {
2741 let mut value = [0u8; super::COOKIE_KEY_LEN];
2742 value.copy_from_slice(key.as_ref());
2743 value
2744 });
2745 assert_eq!(
2746 previous_key,
2747 Some(original_key),
2748 "rotating cookies must retain previous key for grace period"
2749 );
2750 assert!(server.verify_cookie(addr, Some(old_cookie)));
2751 }
2752
2753 #[test]
2754 fn cookie_is_bound_to_socket_address() {
2755 let config = TransportConfig {
2756 send_cookie: true,
2757 ..TransportConfig::default()
2758 };
2759 let server = build_test_server(config);
2760 let addr_a = "127.0.0.1:19132"
2761 .parse::<SocketAddr>()
2762 .expect("valid socket addr");
2763 let addr_b = "127.0.0.2:19132"
2764 .parse::<SocketAddr>()
2765 .expect("valid socket addr");
2766
2767 let cookie = server.generate_cookie(addr_a);
2768 assert!(server.verify_cookie(addr_a, Some(cookie)));
2769 assert!(!server.verify_cookie(addr_b, Some(cookie)));
2770 }
2771
2772 #[test]
2773 fn cookie_mismatch_guard_blocks_after_threshold() {
2774 let config = TransportConfig {
2775 cookie_mismatch_guard: CookieMismatchGuardConfig {
2776 enabled: true,
2777 event_window: Duration::from_secs(30),
2778 mismatch_threshold: 2,
2779 block_duration: Duration::from_secs(10),
2780 },
2781 ..TransportConfig::default()
2782 };
2783 let mut server = build_test_server(config);
2784 let ip: IpAddr = "203.0.113.9".parse().expect("valid ip");
2785 let now = Instant::now();
2786
2787 assert!(!server.record_cookie_mismatch(ip, now));
2788 assert!(server.record_cookie_mismatch(ip, now + Duration::from_millis(1)));
2789
2790 let metrics = server.metrics_snapshot();
2791 assert_eq!(metrics.cookie_mismatch_blocks, 1);
2792 assert_eq!(metrics.rate_blocked_addresses, 1);
2793 }
2794
2795 #[test]
2796 fn request2_cookie_mismatch_updates_metrics_and_uses_guard_blocking() {
2797 let rt = Builder::new_current_thread()
2798 .enable_all()
2799 .build()
2800 .expect("runtime must build");
2801 rt.block_on(async {
2802 let mut config = TransportConfig {
2803 send_cookie: true,
2804 handshake_heuristics: HandshakeHeuristicsConfig {
2805 enabled: false,
2806 ..HandshakeHeuristicsConfig::default()
2807 },
2808 cookie_mismatch_guard: CookieMismatchGuardConfig {
2809 enabled: true,
2810 event_window: Duration::from_secs(30),
2811 mismatch_threshold: 1,
2812 block_duration: Duration::from_secs(10),
2813 },
2814 ..TransportConfig::default()
2815 };
2816 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
2817 .await
2818 .expect("bind should succeed");
2819 config.bind_addr = socket.local_addr().expect("local addr should be available");
2820 let mut server =
2821 TransportServer::with_socket(config, socket).expect("server should build");
2822 let addr = "127.0.0.1:20320"
2823 .parse::<SocketAddr>()
2824 .expect("valid socket addr");
2825
2826 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
2827 protocol_version: RAKNET_PROTOCOL_VERSION,
2828 mtu: 1200,
2829 magic: DEFAULT_UNCONNECTED_MAGIC,
2830 });
2831 server
2832 .handle_offline_packet(addr, &request1, Instant::now())
2833 .await
2834 .expect("request1 handling should succeed");
2835
2836 let wrong_cookie = server.generate_cookie(addr).wrapping_add(1);
2837 let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
2838 server_addr: server.local_addr().expect("local addr should be available"),
2839 mtu: 1200,
2840 client_guid: 0xABCD_ABCD_ABCD_ABCD,
2841 cookie: Some(wrong_cookie),
2842 client_proof: true,
2843 parse_path: Request2ParsePath::StrictWithCookie,
2844 magic: DEFAULT_UNCONNECTED_MAGIC,
2845 });
2846 let result = server
2847 .handle_offline_packet(addr, &request2, Instant::now())
2848 .await
2849 .expect("request2 handling should succeed");
2850 assert!(result.is_none());
2851
2852 let metrics = server.metrics_snapshot();
2853 assert_eq!(metrics.cookie_mismatch_drops, 1);
2854 assert_eq!(metrics.cookie_mismatch_blocks, 1);
2855 assert_eq!(metrics.rate_blocked_addresses, 1);
2856 assert_eq!(metrics.rate_addresses_blocked_cookie_mismatch_guard, 1);
2857 });
2858 }
2859
2860 #[test]
2861 fn pending_handshake_timeout_updates_metrics_and_closes_session() {
2862 let mut server = build_test_server(TransportConfig::default());
2863 let addr = "127.0.0.1:20001"
2864 .parse::<SocketAddr>()
2865 .expect("valid socket addr");
2866 let now = Instant::now();
2867
2868 let mut session = Session::new(1492);
2869 assert!(TransportServer::apply_session_transitions(
2870 &mut session,
2871 &[SessionState::Req1Recv, SessionState::Reply1Sent]
2872 ));
2873 server.sessions.insert(addr, session);
2874 server.pending_handshakes.insert(
2875 addr,
2876 PendingHandshake {
2877 mtu: 1492,
2878 cookie: None,
2879 client_guid: None,
2880 stage: PendingHandshakeStage::AwaitingRequest2,
2881 expires_at: now - Duration::from_millis(1),
2882 },
2883 );
2884
2885 server.prune_pending_handshakes(now);
2886
2887 assert!(!server.sessions.contains_key(&addr));
2888 assert!(!server.pending_handshakes.contains_key(&addr));
2889
2890 let metrics = server.metrics_snapshot();
2891 assert_eq!(metrics.handshake_req1_req2_timeouts, 1);
2892 assert_eq!(metrics.handshake_stage_cancel_drops, 1);
2893 }
2894
2895 #[test]
2896 fn reply2_connect_timeout_updates_metrics_and_closes_session() {
2897 let config = TransportConfig {
2898 handshake_reply2_connect_timeout: Duration::from_secs(1),
2899 ..TransportConfig::default()
2900 };
2901 let mut server = build_test_server(config);
2902 let addr = "127.0.0.1:20002"
2903 .parse::<SocketAddr>()
2904 .expect("valid socket addr");
2905 let now = Instant::now();
2906
2907 let mut session = Session::new(1492);
2908 assert!(TransportServer::apply_session_transitions(
2909 &mut session,
2910 &[
2911 SessionState::Req1Recv,
2912 SessionState::Reply1Sent,
2913 SessionState::Req2Recv,
2914 SessionState::Reply2Sent,
2915 ]
2916 ));
2917 session.touch_activity(now - Duration::from_secs(2));
2918 server.sessions.insert(addr, session);
2919 server.pending_handshakes.insert(
2920 addr,
2921 PendingHandshake {
2922 mtu: 1492,
2923 cookie: None,
2924 client_guid: None,
2925 stage: PendingHandshakeStage::AwaitingConnectionRequest,
2926 expires_at: now - Duration::from_millis(1),
2927 },
2928 );
2929
2930 server.prune_pending_handshakes(now);
2931
2932 assert!(!server.sessions.contains_key(&addr));
2933 let metrics = server.metrics_snapshot();
2934 assert_eq!(metrics.handshake_reply2_connect_timeouts, 1);
2935 assert_eq!(metrics.handshake_stage_cancel_drops, 1);
2936 }
2937
2938 #[test]
2939 fn handshake_heuristic_blocks_after_threshold() {
2940 let config = TransportConfig {
2941 handshake_heuristics: HandshakeHeuristicsConfig {
2942 enabled: true,
2943 event_window: Duration::from_secs(30),
2944 block_duration: Duration::from_secs(10),
2945 score_threshold: 3,
2946 req1_req2_timeout_score: 1,
2947 reply2_connect_timeout_score: 1,
2948 missing_req1_score: 1,
2949 cookie_mismatch_score: 2,
2950 parse_anomaly_score: 1,
2951 },
2952 ..TransportConfig::default()
2953 };
2954 let mut server = build_test_server(config);
2955 let addr = "127.0.0.1:20003"
2956 .parse::<SocketAddr>()
2957 .expect("valid socket addr");
2958 let now = Instant::now();
2959
2960 let blocked_1 =
2961 server.record_handshake_violation(addr, HandshakeViolation::CookieMismatch, now);
2962 let blocked_2 = server.record_handshake_violation(
2963 addr,
2964 HandshakeViolation::CookieMismatch,
2965 now + Duration::from_millis(1),
2966 );
2967
2968 assert!(!blocked_1);
2969 assert!(blocked_2);
2970
2971 let metrics = server.metrics_snapshot();
2972 assert_eq!(metrics.handshake_auto_blocks, 1);
2973 assert_eq!(metrics.rate_blocked_addresses, 1);
2974 assert_eq!(metrics.rate_addresses_blocked_handshake_heuristic, 1);
2975 }
2976
2977 #[test]
2978 fn packet_id_classification_and_internal_addresses_are_consistent() {
2979 assert!(is_offline_packet_id(0x05));
2980 assert!(!is_offline_packet_id(0x03));
2981 assert!(is_connected_control_id(0x03));
2982 assert!(!is_connected_control_id(0x05));
2983
2984 let server_addr = "10.1.2.3:19132"
2985 .parse::<SocketAddr>()
2986 .expect("valid socket addr");
2987 let internal = build_internal_addrs(server_addr);
2988 assert_eq!(internal[0], server_addr);
2989 for addr in internal.iter().skip(1) {
2990 assert!(addr.ip().is_unspecified());
2991 assert_eq!(addr.port(), 0);
2992 }
2993 }
2994
2995 #[test]
2996 fn open_connection_request1_creates_session_with_transport_tunables() {
2997 let rt = Builder::new_current_thread()
2998 .enable_all()
2999 .build()
3000 .expect("runtime must build");
3001 rt.block_on(async {
3002 let mut config = TransportConfig {
3003 session_tunables: SessionTunables {
3004 outgoing_queue_max_frames: 1,
3005 outgoing_queue_max_bytes: 128,
3006 outgoing_queue_soft_ratio: 0.95,
3007 ..SessionTunables::default()
3008 },
3009 ..TransportConfig::default()
3010 };
3011 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3012 .await
3013 .expect("bind should succeed");
3014 config.bind_addr = socket.local_addr().expect("local addr should be available");
3015 let mut server =
3016 TransportServer::with_socket(config, socket).expect("server should build");
3017 let addr = "127.0.0.1:20123"
3018 .parse::<SocketAddr>()
3019 .expect("valid socket addr");
3020
3021 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3022 protocol_version: RAKNET_PROTOCOL_VERSION,
3023 mtu: 1200,
3024 magic: DEFAULT_UNCONNECTED_MAGIC,
3025 });
3026
3027 let event = server
3028 .handle_offline_packet(addr, &request1, Instant::now())
3029 .await
3030 .expect("offline request1 handling should succeed");
3031 assert!(
3032 event.is_none(),
3033 "request1 should not emit a transport event in healthy path"
3034 );
3035
3036 let session = server
3037 .sessions
3038 .get_mut(&addr)
3039 .expect("request1 must create a pending session");
3040 let first = session.queue_payload(
3041 Bytes::from_static(b"\xFEfirst"),
3042 Reliability::Unreliable,
3043 0,
3044 RakPriority::Normal,
3045 );
3046 let second = session.queue_payload(
3047 Bytes::from_static(b"\xFEsecond"),
3048 Reliability::Unreliable,
3049 0,
3050 RakPriority::Normal,
3051 );
3052
3053 assert!(
3054 matches!(first, QueuePayloadResult::Enqueued { .. }),
3055 "first packet should fit in queue"
3056 );
3057 assert_eq!(
3058 second,
3059 QueuePayloadResult::Dropped,
3060 "second packet should be dropped because configured queue max frames=1"
3061 );
3062 });
3063 }
3064
3065 #[test]
3066 fn process_next_event_uses_configured_unconnected_magic() {
3067 let rt = Builder::new_current_thread()
3068 .enable_all()
3069 .build()
3070 .expect("runtime must build");
3071
3072 rt.block_on(async {
3073 let custom_magic = [
3074 0x13, 0x57, 0x9B, 0xDF, 0x24, 0x68, 0xAC, 0xF0, 0x10, 0x32, 0x54, 0x76, 0x98, 0xBA,
3075 0xDC, 0xFE,
3076 ];
3077 let mut config = TransportConfig {
3078 unconnected_magic: custom_magic,
3079 ..TransportConfig::default()
3080 };
3081
3082 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3083 .await
3084 .expect("bind should succeed");
3085 config.bind_addr = socket.local_addr().expect("local addr should be available");
3086 let server_addr = config.bind_addr;
3087 let mut server =
3088 TransportServer::with_socket(config, socket).expect("server should build");
3089
3090 let client = tokio::net::UdpSocket::bind("127.0.0.1:0")
3091 .await
3092 .expect("client bind should succeed");
3093
3094 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3095 protocol_version: RAKNET_PROTOCOL_VERSION,
3096 mtu: 1200,
3097 magic: custom_magic,
3098 });
3099 let mut buf = BytesMut::new();
3100 request1.encode(&mut buf).expect("encode must succeed");
3101 client
3102 .send_to(&buf, server_addr)
3103 .await
3104 .expect("send should succeed");
3105
3106 let event = tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3107 .await
3108 .expect("server should receive packet in time")
3109 .expect("processing should succeed");
3110 assert!(
3111 matches!(event, TransportEvent::OfflinePacket { .. }),
3112 "matching custom magic should parse as offline packet event"
3113 );
3114 });
3115 }
3116
3117 #[test]
3118 fn process_next_event_rejects_mismatched_unconnected_magic() {
3119 let rt = Builder::new_current_thread()
3120 .enable_all()
3121 .build()
3122 .expect("runtime must build");
3123
3124 rt.block_on(async {
3125 let custom_magic = [
3126 0x13, 0x57, 0x9B, 0xDF, 0x24, 0x68, 0xAC, 0xF0, 0x10, 0x32, 0x54, 0x76, 0x98, 0xBA,
3127 0xDC, 0xFE,
3128 ];
3129 let mut config = TransportConfig {
3130 unconnected_magic: custom_magic,
3131 ..TransportConfig::default()
3132 };
3133
3134 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3135 .await
3136 .expect("bind should succeed");
3137 config.bind_addr = socket.local_addr().expect("local addr should be available");
3138 let server_addr = config.bind_addr;
3139 let mut server =
3140 TransportServer::with_socket(config, socket).expect("server should build");
3141
3142 let client = tokio::net::UdpSocket::bind("127.0.0.1:0")
3143 .await
3144 .expect("client bind should succeed");
3145
3146 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3147 protocol_version: RAKNET_PROTOCOL_VERSION,
3148 mtu: 1200,
3149 magic: DEFAULT_UNCONNECTED_MAGIC,
3150 });
3151 let mut buf = BytesMut::new();
3152 request1.encode(&mut buf).expect("encode must succeed");
3153 client
3154 .send_to(&buf, server_addr)
3155 .await
3156 .expect("send should succeed");
3157
3158 let event = tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3159 .await
3160 .expect("server should receive packet in time")
3161 .expect("processing should succeed");
3162 assert!(
3163 matches!(
3164 event,
3165 TransportEvent::DecodeError {
3166 error: DecodeError::InvalidMagic,
3167 ..
3168 }
3169 ),
3170 "mismatched magic should be rejected as invalid magic"
3171 );
3172 });
3173 }
3174
3175 #[test]
3176 fn rate_limit_config_updates_sync_with_transport_config() {
3177 let mut server = build_test_server(TransportConfig::default());
3178 server.set_rate_limit_config(TransportRateLimitConfig {
3179 per_ip_packet_limit: 0,
3180 global_packet_limit: 0,
3181 rate_window: Duration::ZERO,
3182 block_duration: Duration::ZERO,
3183 });
3184
3185 let effective = server.rate_limit_config();
3186 assert_eq!(effective.per_ip_packet_limit, 1);
3187 assert_eq!(effective.global_packet_limit, 1);
3188 assert_eq!(effective.rate_window, Duration::from_millis(1));
3189 assert_eq!(effective.block_duration, Duration::from_millis(1));
3190 assert_eq!(server.config().per_ip_packet_limit, 1);
3191 assert_eq!(server.config().global_packet_limit, 1);
3192 assert_eq!(server.config().rate_window, Duration::from_millis(1));
3193 assert_eq!(server.config().block_duration, Duration::from_millis(1));
3194 }
3195
3196 #[test]
3197 fn permanent_block_api_tracks_metrics_and_requires_manual_unblock() {
3198 let mut server = build_test_server(TransportConfig::default());
3199 let ip: IpAddr = "198.51.100.9".parse().expect("valid ip");
3200
3201 assert!(server.block_address(ip));
3202 assert!(!server.block_address(ip));
3203 let metrics_after_block = server.metrics_snapshot();
3204 assert_eq!(metrics_after_block.rate_blocked_addresses, 1);
3205 assert_eq!(metrics_after_block.rate_addresses_blocked_manual, 1);
3206
3207 assert!(server.unblock_address(ip));
3208 assert_eq!(server.metrics_snapshot().rate_blocked_addresses, 0);
3209 }
3210
3211 #[test]
3212 fn local_and_remote_disconnect_metrics_are_tracked_separately() {
3213 let mut server = build_test_server(TransportConfig::default());
3214 let addr = "127.0.0.1:20100"
3215 .parse::<SocketAddr>()
3216 .expect("valid socket addr");
3217 let mut session = Session::new(1492);
3218 assert!(TransportServer::apply_session_transitions(
3219 &mut session,
3220 &[
3221 SessionState::Req1Recv,
3222 SessionState::Reply1Sent,
3223 SessionState::Req2Recv,
3224 SessionState::Reply2Sent,
3225 SessionState::ConnReqRecv,
3226 SessionState::ConnReqAcceptedSent,
3227 SessionState::NewIncomingRecv,
3228 SessionState::Connected,
3229 ],
3230 ));
3231 server.sessions.insert(addr, session);
3232
3233 assert!(server.disconnect_peer(addr));
3234 server.record_remote_disconnect(RemoteDisconnectReason::DisconnectionNotification {
3235 reason_code: Some(3),
3236 });
3237 server.record_remote_disconnect(RemoteDisconnectReason::DetectLostConnection);
3238
3239 let metrics = server.metrics_snapshot();
3240 assert_eq!(metrics.local_requested_disconnects, 1);
3241 assert_eq!(metrics.remote_disconnect_notifications, 1);
3242 assert_eq!(metrics.remote_detect_lost_disconnects, 1);
3243 assert_eq!(metrics.sessions_closed_total, 1);
3244 }
3245
3246 #[test]
3247 fn processing_budget_config_updates_sync_with_transport_config() {
3248 let mut server = build_test_server(TransportConfig::default());
3249 server.set_processing_budget_config(TransportProcessingBudgetConfig {
3250 enabled: true,
3251 per_ip_refill_units_per_sec: 0,
3252 per_ip_burst_units: 0,
3253 global_refill_units_per_sec: 0,
3254 global_burst_units: 0,
3255 bucket_idle_ttl: Duration::ZERO,
3256 });
3257
3258 let effective = server.processing_budget_config();
3259 assert!(effective.enabled);
3260 assert_eq!(effective.per_ip_refill_units_per_sec, 1);
3261 assert_eq!(effective.per_ip_burst_units, 1);
3262 assert_eq!(effective.global_refill_units_per_sec, 1);
3263 assert_eq!(effective.global_burst_units, 1);
3264 assert_eq!(effective.bucket_idle_ttl, Duration::from_millis(1));
3265 assert_eq!(
3266 server
3267 .config()
3268 .processing_budget
3269 .per_ip_refill_units_per_sec,
3270 1
3271 );
3272 assert_eq!(server.config().processing_budget.per_ip_burst_units, 1);
3273 assert_eq!(
3274 server
3275 .config()
3276 .processing_budget
3277 .global_refill_units_per_sec,
3278 1
3279 );
3280 assert_eq!(server.config().processing_budget.global_burst_units, 1);
3281 assert_eq!(
3282 server.config().processing_budget.bucket_idle_ttl,
3283 Duration::from_millis(1)
3284 );
3285 }
3286
3287 #[test]
3288 fn processing_budget_exhaustion_drops_datagram_without_forcing_disconnect() {
3289 let rt = Builder::new_current_thread()
3290 .enable_all()
3291 .build()
3292 .expect("runtime must build");
3293 rt.block_on(async {
3294 let mut config = TransportConfig {
3295 per_ip_packet_limit: 10_000,
3296 global_packet_limit: 10_000,
3297 processing_budget: ProcessingBudgetConfig {
3298 enabled: true,
3299 per_ip_refill_units_per_sec: 1,
3300 per_ip_burst_units: 1,
3301 global_refill_units_per_sec: 10_000,
3302 global_burst_units: 10_000,
3303 bucket_idle_ttl: Duration::from_secs(5),
3304 },
3305 ..TransportConfig::default()
3306 };
3307
3308 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3309 .await
3310 .expect("bind should succeed");
3311 config.bind_addr = socket.local_addr().expect("local addr should be available");
3312 let server_addr = config.bind_addr;
3313 let mut server =
3314 TransportServer::with_socket(config, socket).expect("server should build");
3315
3316 let client = tokio::net::UdpSocket::bind("127.0.0.1:0")
3317 .await
3318 .expect("client bind should succeed");
3319 let client_addr = client.local_addr().expect("client local addr");
3320
3321 let mut session = Session::new(1492);
3322 assert!(TransportServer::apply_session_transitions(
3323 &mut session,
3324 &[
3325 SessionState::Req1Recv,
3326 SessionState::Reply1Sent,
3327 SessionState::Req2Recv,
3328 SessionState::Reply2Sent,
3329 SessionState::ConnReqRecv,
3330 SessionState::ConnReqAcceptedSent,
3331 SessionState::NewIncomingRecv,
3332 SessionState::Connected,
3333 ],
3334 ));
3335 server.sessions.insert(client_addr, session);
3336
3337 let make_datagram = |seq: u32| Datagram {
3338 header: DatagramHeader {
3339 flags: DatagramFlags::VALID,
3340 sequence: Sequence24::new(seq),
3341 },
3342 payload: DatagramPayload::Frames(vec![Frame {
3343 header: FrameHeader::new(Reliability::Unreliable, false, false),
3344 bit_length: (8u16) << 3,
3345 reliable_index: None,
3346 sequence_index: None,
3347 ordering_index: None,
3348 ordering_channel: None,
3349 split: None,
3350 payload: Bytes::from_static(b"payload!"),
3351 }]),
3352 };
3353
3354 let mut first = BytesMut::new();
3355 make_datagram(1)
3356 .encode(&mut first)
3357 .expect("datagram encode should succeed");
3358 client
3359 .send_to(&first, server_addr)
3360 .await
3361 .expect("send first datagram should succeed");
3362 let first_event =
3363 tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3364 .await
3365 .expect("first recv should complete")
3366 .expect("first recv should succeed");
3367 assert!(
3368 matches!(first_event, TransportEvent::ConnectedFrames { .. }),
3369 "first datagram should pass before budget is depleted"
3370 );
3371
3372 let mut second = BytesMut::new();
3373 make_datagram(2)
3374 .encode(&mut second)
3375 .expect("datagram encode should succeed");
3376 client
3377 .send_to(&second, server_addr)
3378 .await
3379 .expect("send second datagram should succeed");
3380 let second_event =
3381 tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3382 .await
3383 .expect("second recv should complete")
3384 .expect("second recv should succeed");
3385 assert!(
3386 matches!(second_event, TransportEvent::RateLimited { addr } if addr == client_addr),
3387 "second datagram should be dropped by processing budget"
3388 );
3389
3390 assert!(
3391 server.sessions.contains_key(&client_addr),
3392 "budget drop must not force disconnect"
3393 );
3394 let metrics = server.metrics_snapshot();
3395 assert_eq!(metrics.processing_budget_drops_total, 1);
3396 assert_eq!(metrics.processing_budget_drops_ip_exhausted_total, 1);
3397 });
3398 }
3399
3400 #[test]
3401 fn open_connection_request2_rejects_already_connected_session() {
3402 let rt = Builder::new_current_thread()
3403 .enable_all()
3404 .build()
3405 .expect("runtime must build");
3406 rt.block_on(async {
3407 let mut config = TransportConfig::default();
3408 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3409 .await
3410 .expect("bind should succeed");
3411 config.bind_addr = socket.local_addr().expect("local addr should be available");
3412 let mut server =
3413 TransportServer::with_socket(config, socket).expect("server should build");
3414 let addr = "127.0.0.1:20310"
3415 .parse::<SocketAddr>()
3416 .expect("valid socket addr");
3417
3418 let mut session = Session::new(1492);
3419 assert!(TransportServer::apply_session_transitions(
3420 &mut session,
3421 &[
3422 SessionState::Req1Recv,
3423 SessionState::Reply1Sent,
3424 SessionState::Req2Recv,
3425 SessionState::Reply2Sent,
3426 SessionState::ConnReqRecv,
3427 SessionState::ConnReqAcceptedSent,
3428 SessionState::NewIncomingRecv,
3429 SessionState::Connected,
3430 ]
3431 ));
3432 server.sessions.insert(addr, session);
3433
3434 let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3435 server_addr: server.local_addr().expect("local addr should be available"),
3436 mtu: 1200,
3437 client_guid: 0xAABB_CCDD_EEFF_0011,
3438 cookie: None,
3439 client_proof: false,
3440 parse_path: Request2ParsePath::StrictNoCookie,
3441 magic: DEFAULT_UNCONNECTED_MAGIC,
3442 });
3443
3444 let event = server
3445 .handle_offline_packet(addr, &request2, Instant::now())
3446 .await
3447 .expect("request2 handling should succeed");
3448 assert!(event.is_none(), "reject path must not emit transport event");
3449
3450 let metrics = server.metrics_snapshot();
3451 assert_eq!(metrics.handshake_already_connected_rejects, 1);
3452 assert_eq!(metrics.handshake_missing_req1_drops, 0);
3453 });
3454 }
3455
3456 #[test]
3457 fn open_connection_request1_rejects_ip_recently_connected() {
3458 let rt = Builder::new_current_thread()
3459 .enable_all()
3460 .build()
3461 .expect("runtime must build");
3462 rt.block_on(async {
3463 let mut config = TransportConfig {
3464 ip_recently_connected_window: Duration::from_secs(3),
3465 ..TransportConfig::default()
3466 };
3467 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3468 .await
3469 .expect("bind should succeed");
3470 config.bind_addr = socket.local_addr().expect("local addr should be available");
3471 let mut server =
3472 TransportServer::with_socket(config, socket).expect("server should build");
3473 let addr = "127.0.0.1:20311"
3474 .parse::<SocketAddr>()
3475 .expect("valid socket addr");
3476
3477 let mut session = Session::new(1492);
3478 assert!(TransportServer::apply_session_transitions(
3479 &mut session,
3480 &[
3481 SessionState::Req1Recv,
3482 SessionState::Reply1Sent,
3483 SessionState::Req2Recv,
3484 SessionState::Reply2Sent,
3485 SessionState::ConnReqRecv,
3486 SessionState::ConnReqAcceptedSent,
3487 SessionState::NewIncomingRecv,
3488 SessionState::Connected,
3489 ]
3490 ));
3491 server.sessions.insert(addr, session);
3492 server.close_session(addr);
3493
3494 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3495 protocol_version: RAKNET_PROTOCOL_VERSION,
3496 mtu: 1200,
3497 magic: DEFAULT_UNCONNECTED_MAGIC,
3498 });
3499
3500 let event = server
3501 .handle_offline_packet(addr, &request1, Instant::now())
3502 .await
3503 .expect("request1 handling should succeed");
3504 assert!(event.is_none(), "reject path must not emit transport event");
3505 assert!(
3506 !server.pending_handshakes.contains_key(&addr),
3507 "rejected address must not create pending handshake"
3508 );
3509 assert!(
3510 !server.sessions.contains_key(&addr),
3511 "rejected address must not create session"
3512 );
3513
3514 let metrics = server.metrics_snapshot();
3515 assert_eq!(metrics.handshake_ip_recently_connected_rejects, 1);
3516 });
3517 }
3518
3519 #[test]
3520 fn open_connection_request2_drops_when_server_addr_policy_rejects_port() {
3521 let rt = Builder::new_current_thread()
3522 .enable_all()
3523 .build()
3524 .expect("runtime must build");
3525 rt.block_on(async {
3526 let mut config = TransportConfig {
3527 request2_server_addr_policy: Request2ServerAddrPolicy::PortOnly,
3528 ..TransportConfig::default()
3529 };
3530 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3531 .await
3532 .expect("bind should succeed");
3533 config.bind_addr = socket.local_addr().expect("local addr should be available");
3534 let mut server =
3535 TransportServer::with_socket(config, socket).expect("server should build");
3536 let addr = "127.0.0.1:20312"
3537 .parse::<SocketAddr>()
3538 .expect("valid socket addr");
3539
3540 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3541 protocol_version: RAKNET_PROTOCOL_VERSION,
3542 mtu: 1200,
3543 magic: DEFAULT_UNCONNECTED_MAGIC,
3544 });
3545 let event = server
3546 .handle_offline_packet(addr, &request1, Instant::now())
3547 .await
3548 .expect("request1 handling should succeed");
3549 assert!(event.is_none());
3550 assert!(
3551 server.pending_handshakes.contains_key(&addr),
3552 "request1 should create pending handshake"
3553 );
3554
3555 let local_addr = server.local_addr().expect("local addr should be available");
3556 let wrong_port = local_addr.port().saturating_add(1);
3557 let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3558 server_addr: SocketAddr::from(([203, 0, 113, 10], wrong_port)),
3559 mtu: 1200,
3560 client_guid: 0x0123_4567_89AB_CDEF,
3561 cookie: None,
3562 client_proof: false,
3563 parse_path: Request2ParsePath::StrictNoCookie,
3564 magic: DEFAULT_UNCONNECTED_MAGIC,
3565 });
3566
3567 let event = server
3568 .handle_offline_packet(addr, &request2, Instant::now())
3569 .await
3570 .expect("request2 handling should succeed");
3571 assert!(event.is_none(), "policy mismatch should be silent drop");
3572 assert!(
3573 server.pending_handshakes.contains_key(&addr),
3574 "policy mismatch must keep pending handshake for retry"
3575 );
3576
3577 let metrics = server.metrics_snapshot();
3578 assert_eq!(metrics.request2_server_addr_mismatch_drops, 1);
3579 assert_eq!(metrics.handshake_stage_cancel_drops, 1);
3580 assert_eq!(metrics.handshake_missing_req1_drops, 0);
3581 });
3582 }
3583
3584 #[test]
3585 fn open_connection_request2_accepts_mismatch_when_policy_disabled() {
3586 let rt = Builder::new_current_thread()
3587 .enable_all()
3588 .build()
3589 .expect("runtime must build");
3590 rt.block_on(async {
3591 let mut config = TransportConfig {
3592 request2_server_addr_policy: Request2ServerAddrPolicy::Disabled,
3593 ..TransportConfig::default()
3594 };
3595 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3596 .await
3597 .expect("bind should succeed");
3598 config.bind_addr = socket.local_addr().expect("local addr should be available");
3599 let mut server =
3600 TransportServer::with_socket(config, socket).expect("server should build");
3601 let addr = "127.0.0.1:20313"
3602 .parse::<SocketAddr>()
3603 .expect("valid socket addr");
3604
3605 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3606 protocol_version: RAKNET_PROTOCOL_VERSION,
3607 mtu: 1200,
3608 magic: DEFAULT_UNCONNECTED_MAGIC,
3609 });
3610 server
3611 .handle_offline_packet(addr, &request1, Instant::now())
3612 .await
3613 .expect("request1 handling should succeed");
3614
3615 let local_addr = server.local_addr().expect("local addr should be available");
3616 let wrong_port = local_addr.port().saturating_add(1);
3617 let cookie = server.generate_cookie(addr);
3618 let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3619 server_addr: SocketAddr::from(([203, 0, 113, 10], wrong_port)),
3620 mtu: 1200,
3621 client_guid: 0x1020_3040_5060_7080,
3622 cookie: Some(cookie),
3623 client_proof: true,
3624 parse_path: Request2ParsePath::StrictWithCookie,
3625 magic: DEFAULT_UNCONNECTED_MAGIC,
3626 });
3627
3628 let event = server
3629 .handle_offline_packet(addr, &request2, Instant::now())
3630 .await
3631 .expect("request2 handling should succeed");
3632 assert!(event.is_none(), "successful request2 should not emit event");
3633
3634 assert!(
3635 server.pending_handshakes.contains_key(&addr),
3636 "accepted request2 must keep pending handshake until connected"
3637 );
3638 let session = server
3639 .sessions
3640 .get(&addr)
3641 .expect("accepted request2 must keep session");
3642 assert_eq!(session.state(), SessionState::Reply2Sent);
3643
3644 let metrics = server.metrics_snapshot();
3645 assert_eq!(metrics.request2_server_addr_mismatch_drops, 0);
3646 });
3647 }
3648
3649 #[test]
3650 fn open_connection_request2_retry_resends_reply2_without_missing_req1_penalty() {
3651 let rt = Builder::new_current_thread()
3652 .enable_all()
3653 .build()
3654 .expect("runtime must build");
3655 rt.block_on(async {
3656 let mut config = TransportConfig::default();
3657 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3658 .await
3659 .expect("bind should succeed");
3660 config.bind_addr = socket.local_addr().expect("local addr should be available");
3661 let mut server =
3662 TransportServer::with_socket(config, socket).expect("server should build");
3663 let addr = "127.0.0.1:20314"
3664 .parse::<SocketAddr>()
3665 .expect("valid socket addr");
3666
3667 let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3668 protocol_version: RAKNET_PROTOCOL_VERSION,
3669 mtu: 1200,
3670 magic: DEFAULT_UNCONNECTED_MAGIC,
3671 });
3672 server
3673 .handle_offline_packet(addr, &request1, Instant::now())
3674 .await
3675 .expect("request1 handling should succeed");
3676
3677 let cookie = server.generate_cookie(addr);
3678 let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3679 server_addr: server.local_addr().expect("local addr should be available"),
3680 mtu: 1200,
3681 client_guid: 0x1111_2222_3333_4444,
3682 cookie: Some(cookie),
3683 client_proof: true,
3684 parse_path: Request2ParsePath::StrictWithCookie,
3685 magic: DEFAULT_UNCONNECTED_MAGIC,
3686 });
3687
3688 let first = server
3689 .handle_offline_packet(addr, &request2, Instant::now())
3690 .await
3691 .expect("first request2 should succeed");
3692 assert!(first.is_none());
3693
3694 let second = server
3695 .handle_offline_packet(addr, &request2, Instant::now() + Duration::from_millis(10))
3696 .await
3697 .expect("retry request2 should succeed");
3698 assert!(second.is_none());
3699
3700 let pending = server
3701 .pending_handshakes
3702 .get(&addr)
3703 .copied()
3704 .expect("pending handshake should remain until connected");
3705 assert_eq!(
3706 pending.stage,
3707 PendingHandshakeStage::AwaitingConnectionRequest
3708 );
3709
3710 let session = server
3711 .sessions
3712 .get(&addr)
3713 .expect("session should exist during post-reply2 handshake stage");
3714 assert_eq!(session.state(), SessionState::Reply2Sent);
3715
3716 let metrics = server.metrics_snapshot();
3717 assert_eq!(metrics.handshake_missing_req1_drops, 0);
3718 assert_eq!(metrics.handshake_stage_cancel_drops, 0);
3719 });
3720 }
3721
3722 #[test]
3723 fn bind_shards_requires_reuse_port_when_count_is_greater_than_one() {
3724 let config = TransportConfig {
3725 reuse_port: false,
3726 ..TransportConfig::default()
3727 };
3728 let rt = Builder::new_current_thread()
3729 .enable_all()
3730 .build()
3731 .expect("runtime must build");
3732 match rt.block_on(TransportServer::bind_shards(config, 2)) {
3733 Ok(_) => panic!("should reject shard_count > 1 without reuse_port"),
3734 Err(err) => assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput),
3735 }
3736 }
3737
3738 #[test]
3739 fn bind_shard_plan_enables_both_families_for_split_mode() {
3740 let config = TransportConfig {
3741 bind_addr: SocketAddr::from(([0, 0, 0, 0], 19132)),
3742 split_ipv4_ipv6_bind: true,
3743 ..TransportConfig::default()
3744 };
3745 let plan = TransportServer::build_shard_bind_plan(&config, 1);
3746 let expected_v4 = SocketAddr::from(([0, 0, 0, 0], 19132));
3747 let expected_v6 = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 19132, 0, 0));
3748 assert_eq!(plan.len(), 2);
3749 assert!(plan.contains(&expected_v4));
3750 assert!(plan.contains(&expected_v6));
3751 }
3752
3753 #[test]
3754 fn bind_shard_plan_round_robins_families_when_split_mode_exceeds_two_workers() {
3755 let config = TransportConfig {
3756 bind_addr: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 19132)),
3757 split_ipv4_ipv6_bind: true,
3758 ..TransportConfig::default()
3759 };
3760 let plan = TransportServer::build_shard_bind_plan(&config, 3);
3761 let expected_v4 = SocketAddr::from(([0, 0, 0, 0], 19132));
3762 let expected_v6 = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 19132, 0, 0));
3763 assert_eq!(plan, vec![expected_v4, expected_v6, expected_v4]);
3764 assert!(TransportServer::has_duplicate_bind_targets(&plan));
3765 }
3766
3767 #[test]
3768 fn bind_shards_split_mode_rejects_duplicate_workers_without_reuse_port() {
3769 let config = TransportConfig {
3770 reuse_port: false,
3771 split_ipv4_ipv6_bind: true,
3772 ..TransportConfig::default()
3773 };
3774 let rt = Builder::new_current_thread()
3775 .enable_all()
3776 .build()
3777 .expect("runtime must build");
3778 match rt.block_on(TransportServer::bind_shards(config, 3)) {
3779 Ok(_) => panic!("should reject duplicate split bind targets without reuse_port"),
3780 Err(err) => assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput),
3781 }
3782 }
3783
3784 #[test]
3785 fn bind_rejects_split_mode_in_single_socket_path() {
3786 let config = TransportConfig {
3787 split_ipv4_ipv6_bind: true,
3788 ..TransportConfig::default()
3789 };
3790 let rt = Builder::new_current_thread()
3791 .enable_all()
3792 .build()
3793 .expect("runtime must build");
3794 match rt.block_on(TransportServer::bind(config)) {
3795 Ok(_) => panic!("single-socket bind must reject split mode"),
3796 Err(err) => assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput),
3797 }
3798 }
3799
3800 #[test]
3801 #[cfg(any(
3802 target_os = "linux",
3803 target_os = "android",
3804 target_os = "macos",
3805 target_os = "ios",
3806 target_os = "freebsd",
3807 target_os = "netbsd",
3808 target_os = "openbsd"
3809 ))]
3810 fn bind_accepts_socket_tuning_profile() {
3811 let config = TransportConfig {
3812 socket_tuning: TransportSocketTuning {
3813 recv_buffer_size: Some(512 * 1024),
3814 send_buffer_size: Some(512 * 1024),
3815 ipv4_ttl: Some(64),
3816 ipv4_tos: Some(0x10),
3817 ipv6_unicast_hops: None,
3818 disable_ip_fragmentation: false,
3819 },
3820 ..TransportConfig::default()
3821 };
3822 let rt = Builder::new_current_thread()
3823 .enable_all()
3824 .build()
3825 .expect("runtime must build");
3826 let server = rt
3827 .block_on(TransportServer::bind(config))
3828 .expect("bind with valid socket tuning should succeed");
3829 let _addr = server.local_addr().expect("server must have local addr");
3830 }
3831
3832 #[test]
3833 #[cfg(any(target_os = "linux", target_os = "android"))]
3834 fn bind_accepts_disable_ip_fragmentation_on_linux_like_targets() {
3835 let config = TransportConfig {
3836 socket_tuning: TransportSocketTuning {
3837 disable_ip_fragmentation: true,
3838 ..TransportSocketTuning::default()
3839 },
3840 ..TransportConfig::default()
3841 };
3842 let rt = Builder::new_current_thread()
3843 .enable_all()
3844 .build()
3845 .expect("runtime must build");
3846 let server = rt
3847 .block_on(TransportServer::bind(config))
3848 .expect("disable_ip_fragmentation should be supported");
3849 let _addr = server.local_addr().expect("server must have local addr");
3850 }
3851
3852 #[test]
3853 #[cfg(any(
3854 target_os = "linux",
3855 target_os = "android",
3856 target_os = "macos",
3857 target_os = "ios",
3858 target_os = "freebsd",
3859 target_os = "netbsd",
3860 target_os = "openbsd"
3861 ))]
3862 fn platform_reports_sharded_reuse_port_support() {
3863 assert!(TransportServer::supports_reuse_port_sharded_bind());
3864 }
3865
3866 #[test]
3867 #[cfg(not(any(
3868 target_os = "linux",
3869 target_os = "android",
3870 target_os = "macos",
3871 target_os = "ios",
3872 target_os = "freebsd",
3873 target_os = "netbsd",
3874 target_os = "openbsd"
3875 )))]
3876 fn bind_shards_uses_shared_socket_fallback_on_unsupported_platform() {
3877 assert!(!TransportServer::supports_reuse_port_sharded_bind());
3878 let config = TransportConfig {
3879 reuse_port: true,
3880 ..TransportConfig::default()
3881 };
3882 let rt = Builder::new_current_thread()
3883 .enable_all()
3884 .build()
3885 .expect("runtime must build");
3886 let workers = rt
3887 .block_on(TransportServer::bind_shards(config, 2))
3888 .expect("fallback bind should succeed");
3889 assert_eq!(workers.len(), 2);
3890 let first_addr = workers[0].local_addr().expect("worker local addr");
3891 for worker in workers.iter().skip(1) {
3892 let addr = worker.local_addr().expect("worker local addr");
3893 assert_eq!(addr, first_addr);
3894 }
3895 }
3896}