Skip to main content

raknet_rust/transport/
server.rs

1use std::collections::{HashMap, HashSet};
2use std::io;
3use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4use std::sync::Arc;
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6#[cfg(any(target_os = "linux", target_os = "android"))]
7use std::{mem, os::fd::AsRawFd};
8
9use bytes::{Bytes, BytesMut};
10use hmac::{Hmac, Mac};
11use sha2::Sha256;
12use socket2::{Domain, Protocol, Socket, Type};
13use tokio::net::UdpSocket;
14use tracing::{debug, warn};
15use zeroize::Zeroizing;
16
17use crate::error::DecodeError;
18use crate::handshake::{
19    AlreadyConnected, ConnectionBanned, IncompatibleProtocolVersion, IpRecentlyConnected,
20    NoFreeIncomingConnections, OfflinePacket, OpenConnectionReply1, OpenConnectionReply2,
21    Request2ParsePath, UnconnectedPong,
22};
23use crate::protocol::connected::{
24    ConnectedControlPacket, ConnectedPing, ConnectedPong, ConnectionRequestAccepted,
25    SYSTEM_ADDRESS_COUNT,
26};
27use crate::protocol::constants::{MAXIMUM_MTU_SIZE, MINIMUM_MTU_SIZE, RAKNET_PROTOCOL_VERSION};
28use crate::protocol::datagram::{Datagram, DatagramPayload};
29use crate::protocol::frame::Frame;
30use crate::protocol::reliability::Reliability;
31use crate::protocol::sequence24::Sequence24;
32use crate::session::{
33    QueuePayloadResult, RakPriority, ReceiptProgress, Session, SessionMetricsSnapshot, SessionState,
34};
35
36use super::config::{
37    ProcessingBudgetConfig, Request2ServerAddrPolicy, TransportConfig, TransportSocketTuning,
38};
39use super::proxy::{InboundProxyRoute, OutboundProxyRoute, ProxyRouter};
40use super::rate_limiter::{
41    BlockReason, ProcessingBudgetDecision, ProcessingBudgetMetricsSnapshot, RateLimitDecision,
42    RateLimiter, RateLimiterConfigSnapshot, RateLimiterMetricsSnapshot,
43};
44use super::session_pipeline::{
45    PipelineFrameAction, SessionPipeline, SessionPipelineMetricsSnapshot,
46};
47
48const RECV_PATH_MAX_NEW_DATAGRAMS: usize = 6;
49const RECV_PATH_MAX_RESEND_DATAGRAMS: usize = 6;
50const COOKIE_KEY_LEN: usize = 32;
51
52type HmacSha256 = Hmac<Sha256>;
53type SecretCookieKey = Zeroizing<[u8; COOKIE_KEY_LEN]>;
54
55#[derive(Debug)]
56pub struct ConnectedFrameDelivery {
57    pub payload: Bytes,
58    pub reliability: Reliability,
59    pub reliable_index: Option<Sequence24>,
60    pub sequence_index: Option<Sequence24>,
61    pub ordering_index: Option<Sequence24>,
62    pub ordering_channel: Option<u8>,
63}
64
65impl ConnectedFrameDelivery {
66    fn from_frame(frame: Frame) -> Self {
67        Self {
68            payload: frame.payload,
69            reliability: frame.header.reliability,
70            reliable_index: frame.reliable_index,
71            sequence_index: frame.sequence_index,
72            ordering_index: frame.ordering_index,
73            ordering_channel: frame.ordering_channel,
74        }
75    }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum RemoteDisconnectReason {
80    DisconnectionNotification { reason_code: Option<u8> },
81    DetectLostConnection,
82}
83
84#[derive(Debug)]
85pub enum TransportEvent {
86    RateLimited {
87        addr: SocketAddr,
88    },
89    ProxyDropped {
90        addr: SocketAddr,
91    },
92    SessionLimitReached {
93        addr: SocketAddr,
94    },
95    OfflinePacket {
96        addr: SocketAddr,
97        packet: OfflinePacket,
98    },
99    ConnectedFrames {
100        addr: SocketAddr,
101        client_guid: Option<u64>,
102        frames: Vec<ConnectedFrameDelivery>,
103        frame_count: usize,
104        receipts: ReceiptProgress,
105    },
106    ConnectedDatagramDroppedNoSession {
107        addr: SocketAddr,
108    },
109    PeerDisconnected {
110        addr: SocketAddr,
111        reason: RemoteDisconnectReason,
112    },
113    DecodeError {
114        addr: SocketAddr,
115        error: DecodeError,
116    },
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum QueueDispatchResult {
121    MissingSession,
122    Enqueued { reliable_bytes: usize },
123    Dropped,
124    Deferred,
125    Disconnected,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub struct TransportRateLimitConfig {
130    pub per_ip_packet_limit: usize,
131    pub global_packet_limit: usize,
132    pub rate_window: Duration,
133    pub block_duration: Duration,
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub struct TransportProcessingBudgetConfig {
138    pub enabled: bool,
139    pub per_ip_refill_units_per_sec: u32,
140    pub per_ip_burst_units: u32,
141    pub global_refill_units_per_sec: u32,
142    pub global_burst_units: u32,
143    pub bucket_idle_ttl: Duration,
144}
145
146#[derive(Debug, Default, Clone, Copy)]
147pub struct TransportMetricsSnapshot {
148    pub session_count: usize,
149    pub sessions_started_total: u64,
150    pub sessions_closed_total: u64,
151    pub packets_forwarded_total: u64,
152    pub bytes_forwarded_total: u64,
153    pub pending_outgoing_frames: usize,
154    pub pending_outgoing_bytes: usize,
155    pub pending_unhandled_frames: usize,
156    pub pending_unhandled_bytes: usize,
157    pub ingress_datagrams: u64,
158    pub ingress_frames: u64,
159    pub duplicate_reliable_drops: u64,
160    pub ordered_stale_drops: u64,
161    pub ordered_buffer_full_drops: u64,
162    pub sequenced_stale_drops: u64,
163    pub sequenced_missing_index_drops: u64,
164    pub reliable_sent_datagrams: u64,
165    pub resent_datagrams: u64,
166    pub ack_out_total: u64,
167    pub nack_out_total: u64,
168    pub acked_datagrams: u64,
169    pub nacked_datagrams: u64,
170    pub split_ttl_drops: u64,
171    pub outgoing_queue_drops: u64,
172    pub outgoing_queue_defers: u64,
173    pub outgoing_queue_disconnects: u64,
174    pub backpressure_delays: u64,
175    pub backpressure_drops: u64,
176    pub backpressure_disconnects: u64,
177    pub local_requested_disconnects: u64,
178    pub remote_disconnect_notifications: u64,
179    pub remote_detect_lost_disconnects: u64,
180    pub illegal_state_transitions: u64,
181    pub timed_out_sessions: u64,
182    pub keepalive_pings_sent: u64,
183    pub unhandled_frames_queued: u64,
184    pub unhandled_frames_flushed: u64,
185    pub unhandled_frames_dropped: u64,
186    pub rate_global_limit_hits: u64,
187    pub rate_ip_block_hits: u64,
188    pub rate_ip_block_hits_rate_exceeded: u64,
189    pub rate_ip_block_hits_manual: u64,
190    pub rate_ip_block_hits_handshake_heuristic: u64,
191    pub rate_ip_block_hits_cookie_mismatch_guard: u64,
192    pub rate_addresses_blocked: u64,
193    pub rate_addresses_blocked_rate_exceeded: u64,
194    pub rate_addresses_blocked_manual: u64,
195    pub rate_addresses_blocked_handshake_heuristic: u64,
196    pub rate_addresses_blocked_cookie_mismatch_guard: u64,
197    pub rate_addresses_unblocked: u64,
198    pub rate_blocked_addresses: usize,
199    pub rate_exception_addresses: usize,
200    pub processing_budget_drops_total: u64,
201    pub processing_budget_drops_ip_exhausted_total: u64,
202    pub processing_budget_drops_global_exhausted_total: u64,
203    pub processing_budget_consumed_units_total: u64,
204    pub processing_budget_active_ip_buckets: usize,
205    pub cookie_rotations: u64,
206    pub cookie_mismatch_drops: u64,
207    pub cookie_mismatch_blocks: u64,
208    pub handshake_stage_cancel_drops: u64,
209    pub handshake_req1_req2_timeouts: u64,
210    pub handshake_reply2_connect_timeouts: u64,
211    pub handshake_missing_req1_drops: u64,
212    pub handshake_auto_blocks: u64,
213    pub handshake_already_connected_rejects: u64,
214    pub handshake_ip_recently_connected_rejects: u64,
215    pub request2_server_addr_mismatch_drops: u64,
216    pub request2_legacy_parse_hits: u64,
217    pub request2_legacy_drops: u64,
218    pub request2_ambiguous_parse_hits: u64,
219    pub request2_ambiguous_drops: u64,
220    pub proxy_inbound_reroutes: u64,
221    pub proxy_inbound_drops: u64,
222    pub proxy_outbound_reroutes: u64,
223    pub proxy_outbound_drops: u64,
224    pub avg_srtt_ms: f64,
225    pub avg_rttvar_ms: f64,
226    pub avg_resend_rto_ms: f64,
227    pub avg_congestion_window_packets: f64,
228    pub resend_ratio: f64,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
232enum PendingHandshakeStage {
233    AwaitingRequest2,
234    AwaitingConnectionRequest,
235}
236
237#[derive(Debug, Clone, Copy)]
238struct PendingHandshake {
239    mtu: u16,
240    cookie: Option<u32>,
241    client_guid: Option<u64>,
242    stage: PendingHandshakeStage,
243    expires_at: Instant,
244}
245
246#[derive(Debug, Clone, Copy)]
247struct HandshakeHeuristicState {
248    window_started_at: Instant,
249    score: u32,
250}
251
252#[derive(Debug, Clone, Copy)]
253struct CookieMismatchGuardState {
254    window_started_at: Instant,
255    mismatches: u32,
256}
257
258#[derive(Debug, Clone, Copy)]
259enum HandshakeViolation {
260    Req1Req2Timeout,
261    Reply2ConnectTimeout,
262    MissingPendingReq1,
263    CookieMismatch,
264    ParseAnomalyDrop,
265}
266
267impl HandshakeViolation {
268    fn score(self, config: &TransportConfig) -> u32 {
269        let h = config.handshake_heuristics;
270        match self {
271            Self::Req1Req2Timeout => h.req1_req2_timeout_score,
272            Self::Reply2ConnectTimeout => h.reply2_connect_timeout_score,
273            Self::MissingPendingReq1 => h.missing_req1_score,
274            Self::CookieMismatch => h.cookie_mismatch_score,
275            Self::ParseAnomalyDrop => h.parse_anomaly_score,
276        }
277    }
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281enum ControlAction {
282    None,
283    CloseSession {
284        remote_reason: Option<RemoteDisconnectReason>,
285        illegal_state: bool,
286    },
287}
288
289pub struct TransportServer {
290    socket: UdpSocket,
291    recv_buffer: Vec<u8>,
292    config: TransportConfig,
293    advertisement_bytes: Bytes,
294    rate_limiter: RateLimiter,
295    proxy_router: Option<Arc<dyn ProxyRouter>>,
296    cookie_key_current: SecretCookieKey,
297    cookie_key_previous: Option<SecretCookieKey>,
298    next_cookie_rotation: Instant,
299    sessions: HashMap<SocketAddr, Session>,
300    session_pipelines: HashMap<SocketAddr, SessionPipeline>,
301    pending_handshakes: HashMap<SocketAddr, PendingHandshake>,
302    sessions_started_total: u64,
303    sessions_closed_total: u64,
304    packets_forwarded_total: u64,
305    bytes_forwarded_total: u64,
306    illegal_state_transitions: u64,
307    timed_out_sessions: u64,
308    local_requested_disconnects: u64,
309    remote_disconnect_notifications: u64,
310    remote_detect_lost_disconnects: u64,
311    keepalive_pings_sent: u64,
312    cookie_rotations: u64,
313    cookie_mismatch_drops: u64,
314    cookie_mismatch_blocks: u64,
315    handshake_stage_cancel_drops: u64,
316    handshake_req1_req2_timeouts: u64,
317    handshake_reply2_connect_timeouts: u64,
318    handshake_missing_req1_drops: u64,
319    handshake_auto_blocks: u64,
320    handshake_already_connected_rejects: u64,
321    handshake_ip_recently_connected_rejects: u64,
322    request2_server_addr_mismatch_drops: u64,
323    handshake_heuristics: HashMap<IpAddr, HandshakeHeuristicState>,
324    cookie_mismatch_guard_states: HashMap<IpAddr, CookieMismatchGuardState>,
325    ip_recently_connected_until: HashMap<IpAddr, Instant>,
326    request2_legacy_parse_hits: u64,
327    request2_legacy_drops: u64,
328    request2_ambiguous_parse_hits: u64,
329    request2_ambiguous_drops: u64,
330    proxy_inbound_reroutes: u64,
331    proxy_inbound_drops: u64,
332    proxy_outbound_reroutes: u64,
333    proxy_outbound_drops: u64,
334}
335
336impl TransportServer {
337    pub const fn supports_reuse_port_sharded_bind() -> bool {
338        cfg!(any(
339            target_os = "linux",
340            target_os = "android",
341            target_os = "macos",
342            target_os = "ios",
343            target_os = "freebsd",
344            target_os = "netbsd",
345            target_os = "openbsd"
346        ))
347    }
348
349    pub async fn bind(config: TransportConfig) -> io::Result<Self> {
350        config.validate().map_err(invalid_config_io_error)?;
351        if config.split_ipv4_ipv6_bind {
352            return Err(io::Error::new(
353                io::ErrorKind::InvalidInput,
354                "split_ipv4_ipv6_bind requires bind_shards(); use shard_count >= 1",
355            ));
356        }
357        let socket = Self::bind_socket(
358            config.bind_addr,
359            config.reuse_port,
360            config.ipv6_only,
361            config.socket_tuning,
362        )
363        .await?;
364        Self::with_socket(config, socket)
365    }
366
367    pub async fn bind_shards(config: TransportConfig, shard_count: usize) -> io::Result<Vec<Self>> {
368        config.validate().map_err(invalid_config_io_error)?;
369        let bind_plan = Self::build_shard_bind_plan(&config, shard_count.max(1));
370        if !config.reuse_port && Self::has_duplicate_bind_targets(&bind_plan) {
371            return Err(io::Error::new(
372                io::ErrorKind::InvalidInput,
373                "duplicate shard bind targets require transport_config.reuse_port = true",
374            ));
375        }
376
377        if Self::has_duplicate_bind_targets(&bind_plan) && !Self::supports_reuse_port_sharded_bind()
378        {
379            return Self::bind_shards_with_shared_socket(config, bind_plan).await;
380        }
381
382        let mut workers = Vec::with_capacity(bind_plan.len());
383        for bind_addr in bind_plan {
384            let mut worker_config = config.clone();
385            worker_config.bind_addr = bind_addr;
386            let socket = Self::bind_socket(
387                bind_addr,
388                worker_config.reuse_port,
389                worker_config.ipv6_only,
390                worker_config.socket_tuning,
391            )
392            .await?;
393            workers.push(Self::with_socket(worker_config, socket)?);
394        }
395        Ok(workers)
396    }
397
398    fn build_shard_bind_plan(config: &TransportConfig, shard_count: usize) -> Vec<SocketAddr> {
399        let targets = Self::bind_targets(config);
400        let effective_count = if config.split_ipv4_ipv6_bind {
401            shard_count.max(targets.len())
402        } else {
403            shard_count
404        };
405        let mut plan = Vec::with_capacity(effective_count);
406        for idx in 0..effective_count {
407            plan.push(targets[idx % targets.len()]);
408        }
409        plan
410    }
411
412    fn bind_targets(config: &TransportConfig) -> Vec<SocketAddr> {
413        if !config.split_ipv4_ipv6_bind {
414            return vec![config.bind_addr];
415        }
416
417        let port = config.bind_addr.port();
418        let v4 = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port));
419        let v6 = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, 0));
420        match config.bind_addr {
421            SocketAddr::V4(_) => vec![v4, v6],
422            SocketAddr::V6(_) => vec![v6, v4],
423        }
424    }
425
426    fn has_duplicate_bind_targets(bind_plan: &[SocketAddr]) -> bool {
427        let mut unique = HashSet::with_capacity(bind_plan.len());
428        for addr in bind_plan {
429            if !unique.insert(*addr) {
430                return true;
431            }
432        }
433        false
434    }
435
436    async fn bind_shards_with_shared_socket(
437        config: TransportConfig,
438        bind_plan: Vec<SocketAddr>,
439    ) -> io::Result<Vec<Self>> {
440        let mut base_sockets: HashMap<SocketAddr, std::net::UdpSocket> = HashMap::with_capacity(2);
441        let mut workers = Vec::with_capacity(bind_plan.len());
442        for bind_addr in bind_plan {
443            let socket = if let Some(base_socket) = base_sockets.get(&bind_addr) {
444                let clone = base_socket.try_clone()?;
445                clone.set_nonblocking(true)?;
446                UdpSocket::from_std(clone)?
447            } else {
448                let base_socket =
449                    Self::bind_socket(bind_addr, false, config.ipv6_only, config.socket_tuning)
450                        .await?;
451                let base_std = base_socket.into_std()?;
452                base_std.set_nonblocking(true)?;
453                let worker_std = base_std.try_clone()?;
454                worker_std.set_nonblocking(true)?;
455                base_sockets.insert(bind_addr, base_std);
456                UdpSocket::from_std(worker_std)?
457            };
458
459            let mut worker_config = config.clone();
460            worker_config.bind_addr = bind_addr;
461            workers.push(Self::with_socket(worker_config, socket)?);
462        }
463
464        Ok(workers)
465    }
466
467    fn with_socket(config: TransportConfig, socket: UdpSocket) -> io::Result<Self> {
468        let mut rate_limiter = RateLimiter::new(
469            config.per_ip_packet_limit,
470            config.global_packet_limit,
471            config.rate_window,
472            config.block_duration,
473        );
474        rate_limiter.set_processing_budget_config(config.processing_budget);
475        for ip in &config.rate_limit_exceptions {
476            rate_limiter.add_exception(*ip);
477        }
478        let now = Instant::now();
479        let cookie_key_current = SecretCookieKey::new(random_cookie_key());
480        let next_cookie_rotation = now + config.cookie_rotation_interval;
481        let advertisement_bytes = Bytes::copy_from_slice(config.advertisement.as_bytes());
482
483        Ok(Self {
484            socket,
485            recv_buffer: vec![0u8; config.mtu.max(MAXIMUM_MTU_SIZE as usize).max(2048)],
486            config,
487            advertisement_bytes,
488            rate_limiter,
489            proxy_router: None,
490            cookie_key_current,
491            cookie_key_previous: None,
492            next_cookie_rotation,
493            sessions: HashMap::new(),
494            session_pipelines: HashMap::new(),
495            pending_handshakes: HashMap::new(),
496            sessions_started_total: 0,
497            sessions_closed_total: 0,
498            packets_forwarded_total: 0,
499            bytes_forwarded_total: 0,
500            illegal_state_transitions: 0,
501            timed_out_sessions: 0,
502            local_requested_disconnects: 0,
503            remote_disconnect_notifications: 0,
504            remote_detect_lost_disconnects: 0,
505            keepalive_pings_sent: 0,
506            cookie_rotations: 0,
507            cookie_mismatch_drops: 0,
508            cookie_mismatch_blocks: 0,
509            handshake_stage_cancel_drops: 0,
510            handshake_req1_req2_timeouts: 0,
511            handshake_reply2_connect_timeouts: 0,
512            handshake_missing_req1_drops: 0,
513            handshake_auto_blocks: 0,
514            handshake_already_connected_rejects: 0,
515            handshake_ip_recently_connected_rejects: 0,
516            request2_server_addr_mismatch_drops: 0,
517            handshake_heuristics: HashMap::new(),
518            cookie_mismatch_guard_states: HashMap::new(),
519            ip_recently_connected_until: HashMap::new(),
520            request2_legacy_parse_hits: 0,
521            request2_legacy_drops: 0,
522            request2_ambiguous_parse_hits: 0,
523            request2_ambiguous_drops: 0,
524            proxy_inbound_reroutes: 0,
525            proxy_inbound_drops: 0,
526            proxy_outbound_reroutes: 0,
527            proxy_outbound_drops: 0,
528        })
529    }
530
531    async fn bind_socket(
532        addr: SocketAddr,
533        reuse_port: bool,
534        ipv6_only: bool,
535        socket_tuning: TransportSocketTuning,
536    ) -> io::Result<UdpSocket> {
537        let domain = if addr.is_ipv4() {
538            Domain::IPV4
539        } else {
540            Domain::IPV6
541        };
542
543        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
544        socket.set_reuse_address(true)?;
545        #[cfg(any(
546            target_os = "linux",
547            target_os = "android",
548            target_os = "macos",
549            target_os = "ios",
550            target_os = "freebsd",
551            target_os = "netbsd",
552            target_os = "openbsd"
553        ))]
554        if reuse_port {
555            socket.set_reuse_port(true)?;
556        }
557        #[cfg(not(any(
558            target_os = "linux",
559            target_os = "android",
560            target_os = "macos",
561            target_os = "ios",
562            target_os = "freebsd",
563            target_os = "netbsd",
564            target_os = "openbsd"
565        )))]
566        let _ = reuse_port;
567
568        if addr.is_ipv6() {
569            socket.set_only_v6(ipv6_only)?;
570        }
571        Self::apply_socket_tuning(&socket, addr, socket_tuning)?;
572        socket.set_nonblocking(true)?;
573        socket.bind(&addr.into())?;
574        let std_socket: std::net::UdpSocket = socket.into();
575        UdpSocket::from_std(std_socket)
576    }
577
578    fn apply_socket_tuning(
579        socket: &Socket,
580        addr: SocketAddr,
581        tuning: TransportSocketTuning,
582    ) -> io::Result<()> {
583        if let Some(size) = tuning.recv_buffer_size {
584            socket.set_recv_buffer_size(size)?;
585        }
586        if let Some(size) = tuning.send_buffer_size {
587            socket.set_send_buffer_size(size)?;
588        }
589
590        match addr {
591            SocketAddr::V4(_) => {
592                if let Some(ttl) = tuning.ipv4_ttl {
593                    socket.set_ttl_v4(ttl)?;
594                }
595                #[cfg(not(any(
596                    target_os = "fuchsia",
597                    target_os = "redox",
598                    target_os = "solaris",
599                    target_os = "illumos",
600                    target_os = "haiku",
601                )))]
602                if let Some(tos) = tuning.ipv4_tos {
603                    socket.set_tos_v4(tos)?;
604                }
605
606                if tuning.disable_ip_fragmentation {
607                    set_df_path_mtu_discovery_v4(socket)?;
608                }
609            }
610            SocketAddr::V6(_) => {
611                if let Some(hops) = tuning.ipv6_unicast_hops {
612                    socket.set_unicast_hops_v6(hops)?;
613                }
614                if tuning.disable_ip_fragmentation {
615                    set_df_path_mtu_discovery_v6(socket)?;
616                }
617            }
618        }
619
620        Ok(())
621    }
622
623    pub fn local_addr(&self) -> io::Result<SocketAddr> {
624        self.socket.local_addr()
625    }
626
627    pub fn session_count(&self) -> usize {
628        self.sessions.len()
629    }
630
631    pub async fn recv_and_process(&mut self) -> io::Result<TransportEvent> {
632        let (len, observed_addr) = self.socket.recv_from(&mut self.recv_buffer).await?;
633        let now = Instant::now();
634        let local_addr = self.local_addr().unwrap_or(self.config.bind_addr);
635        let addr = if let Some(router) = &self.proxy_router {
636            match router.route_inbound(observed_addr, local_addr) {
637                InboundProxyRoute::Local { session_addr } => {
638                    if session_addr != observed_addr {
639                        self.proxy_inbound_reroutes = self.proxy_inbound_reroutes.saturating_add(1);
640                    }
641                    session_addr
642                }
643                InboundProxyRoute::Drop => {
644                    self.proxy_inbound_drops = self.proxy_inbound_drops.saturating_add(1);
645                    return Ok(TransportEvent::ProxyDropped {
646                        addr: observed_addr,
647                    });
648                }
649            }
650        } else {
651            observed_addr
652        };
653
654        self.prune_pending_handshakes(now);
655        self.prune_cookie_mismatch_guard_states(now);
656        self.prune_ip_recently_connected(now);
657        self.prune_idle_sessions(now, Some(addr));
658        self.rate_limiter.tick(now);
659        self.rotate_cookie_keys_if_needed(now);
660
661        let slice = &self.recv_buffer[..len];
662        let Some(first) = slice.first().copied() else {
663            return Ok(TransportEvent::DecodeError {
664                addr,
665                error: DecodeError::UnexpectedEof,
666            });
667        };
668
669        let is_offline = is_offline_packet_id(first);
670        match self.rate_limiter.check(addr.ip(), now) {
671            RateLimitDecision::Allow => {}
672            RateLimitDecision::GlobalLimit => {
673                return Ok(TransportEvent::RateLimited { addr });
674            }
675            RateLimitDecision::IpBlocked { newly_blocked, .. } => {
676                if is_offline && newly_blocked {
677                    self.send_connection_banned(addr).await?;
678                }
679                return Ok(TransportEvent::RateLimited { addr });
680            }
681        }
682
683        if is_offline {
684            let mut payload = slice;
685            let packet =
686                match OfflinePacket::decode_with_magic(&mut payload, self.config.unconnected_magic)
687                {
688                    Ok(packet) => packet,
689                    Err(error) => return Ok(TransportEvent::DecodeError { addr, error }),
690                };
691
692            if let Some(event) = self.handle_offline_packet(addr, &packet, now).await? {
693                return Ok(event);
694            }
695
696            return Ok(TransportEvent::OfflinePacket { addr, packet });
697        }
698
699        let mut payload = slice;
700        let datagram = match Datagram::decode(&mut payload) {
701            Ok(d) => d,
702            Err(error) => return Ok(TransportEvent::DecodeError { addr, error }),
703        };
704
705        if !self.sessions.contains_key(&addr) && !self.pending_handshakes.contains_key(&addr) {
706            return Ok(TransportEvent::ConnectedDatagramDroppedNoSession { addr });
707        }
708
709        if !self.sessions.contains_key(&addr) && self.sessions.len() >= self.config.max_sessions {
710            return Ok(TransportEvent::SessionLimitReached { addr });
711        }
712
713        if matches!(&datagram.payload, DatagramPayload::Frames(_))
714            && !self.rate_limiter.is_exception(addr.ip())
715        {
716            let processing_cost = estimate_connected_datagram_processing_cost(len, &datagram);
717            let budget_decision =
718                self.rate_limiter
719                    .consume_processing_budget(addr.ip(), processing_cost, now);
720            match budget_decision {
721                ProcessingBudgetDecision::Allow => {}
722                ProcessingBudgetDecision::IpExhausted
723                | ProcessingBudgetDecision::GlobalExhausted => {
724                    warn!(
725                        %addr,
726                        cost_units = processing_cost,
727                        decision = ?budget_decision,
728                        "dropping connected datagram due to processing budget exhaustion"
729                    );
730                    return Ok(TransportEvent::RateLimited { addr });
731                }
732            }
733        }
734
735        let server_addr = self.local_addr().unwrap_or(self.config.bind_addr);
736        let now_millis = unix_timestamp_millis();
737        let unhandled_queue_max_frames = self.config.unhandled_queue_max_frames;
738        let unhandled_queue_max_bytes = self.config.unhandled_queue_max_bytes;
739
740        let mut decode_error: Option<DecodeError> = None;
741        let mut close_session = false;
742        let mut illegal_transition = false;
743        let mut remote_disconnect_reason: Option<RemoteDisconnectReason> = None;
744
745        let (mut frames, receipts, datagrams_to_send, became_connected) = {
746            let session_tunables = self.config.session_tunables.clone();
747            let session = self
748                .sessions
749                .entry(addr)
750                .or_insert_with(|| Session::with_tunables(self.config.mtu, session_tunables));
751            let pipeline = self.session_pipelines.entry(addr).or_insert_with(|| {
752                SessionPipeline::new(unhandled_queue_max_frames, unhandled_queue_max_bytes)
753            });
754            session.touch_activity(now);
755
756            let frames = match session.ingest_datagram(datagram, now) {
757                Ok(frames) => frames,
758                Err(error) => return Ok(TransportEvent::DecodeError { addr, error }),
759            };
760
761            let receipts = session.process_incoming_receipts(now);
762            let mut app_frames = Vec::new();
763            let was_connected = session.state() == SessionState::Connected;
764
765            for frame in frames {
766                let Some(id) = frame.payload.first().copied() else {
767                    continue;
768                };
769
770                if !is_connected_control_id(id) {
771                    match pipeline.route_inbound_app_frame(session.state(), frame, &mut app_frames)
772                    {
773                        PipelineFrameAction::Deliver | PipelineFrameAction::Queued => {}
774                        PipelineFrameAction::Overflow => {
775                            close_session = true;
776                            break;
777                        }
778                    }
779                    continue;
780                }
781
782                let mut control_payload = &frame.payload[..];
783                let control_packet = match ConnectedControlPacket::decode(&mut control_payload) {
784                    Ok(pkt) => pkt,
785                    Err(error) => {
786                        decode_error = Some(error);
787                        break;
788                    }
789                };
790
791                match Self::apply_connected_control(
792                    session,
793                    addr,
794                    server_addr,
795                    now_millis,
796                    control_packet,
797                ) {
798                    Ok(ControlAction::None) => {}
799                    Ok(ControlAction::CloseSession {
800                        remote_reason,
801                        illegal_state,
802                    }) => {
803                        close_session = true;
804                        illegal_transition = illegal_state;
805                        if remote_disconnect_reason.is_none() {
806                            remote_disconnect_reason = remote_reason;
807                        }
808                        break;
809                    }
810                    Err(error) => {
811                        decode_error = Some(error);
812                        break;
813                    }
814                }
815            }
816
817            if !close_session && decode_error.is_none() {
818                let _ = pipeline.flush_if_connected(session.state(), &mut app_frames);
819            }
820
821            let immediate_datagrams = if close_session {
822                Vec::new()
823            } else {
824                session.on_tick(
825                    now,
826                    RECV_PATH_MAX_NEW_DATAGRAMS,
827                    self.config.mtu.saturating_mul(RECV_PATH_MAX_NEW_DATAGRAMS),
828                    RECV_PATH_MAX_RESEND_DATAGRAMS,
829                    self.config
830                        .mtu
831                        .saturating_mul(RECV_PATH_MAX_RESEND_DATAGRAMS),
832                )
833            };
834            if session.take_backpressure_disconnect() {
835                close_session = true;
836            }
837
838            let app_frames: Vec<ConnectedFrameDelivery> = app_frames
839                .into_iter()
840                .map(ConnectedFrameDelivery::from_frame)
841                .collect();
842            let became_connected = !was_connected && session.state() == SessionState::Connected;
843
844            (app_frames, receipts, immediate_datagrams, became_connected)
845        };
846
847        if let Some(error) = decode_error {
848            return Ok(TransportEvent::DecodeError { addr, error });
849        }
850
851        if illegal_transition {
852            self.illegal_state_transitions = self.illegal_state_transitions.saturating_add(1);
853        } else {
854            for datagram in &datagrams_to_send {
855                self.send_datagram(addr, datagram).await?;
856            }
857        }
858
859        if close_session {
860            frames.clear();
861            if let Some(reason) = remote_disconnect_reason {
862                self.record_remote_disconnect(reason);
863            }
864            self.close_session(addr);
865            if let Some(reason) = remote_disconnect_reason {
866                return Ok(TransportEvent::PeerDisconnected { addr, reason });
867            }
868        }
869
870        if became_connected {
871            self.sessions_started_total = self.sessions_started_total.saturating_add(1);
872        }
873
874        let client_guid = if became_connected {
875            self.pending_handshakes
876                .get(&addr)
877                .and_then(|pending| pending.client_guid)
878        } else {
879            None
880        };
881
882        if self
883            .sessions
884            .get(&addr)
885            .is_some_and(|session| session.state() == SessionState::Connected)
886        {
887            self.pending_handshakes.remove(&addr);
888        }
889
890        let frame_count = frames.len();
891        let forwarded_bytes = frames
892            .iter()
893            .map(|frame| frame.payload.len() as u64)
894            .sum::<u64>();
895        self.packets_forwarded_total = self
896            .packets_forwarded_total
897            .saturating_add(frame_count as u64);
898        self.bytes_forwarded_total = self.bytes_forwarded_total.saturating_add(forwarded_bytes);
899        Ok(TransportEvent::ConnectedFrames {
900            addr,
901            client_guid,
902            frames,
903            frame_count,
904            receipts,
905        })
906    }
907
908    pub fn config(&self) -> &TransportConfig {
909        &self.config
910    }
911
912    pub fn set_proxy_router(&mut self, router: Arc<dyn ProxyRouter>) {
913        self.proxy_router = Some(router);
914    }
915
916    pub fn clear_proxy_router(&mut self) {
917        self.proxy_router = None;
918    }
919
920    pub fn add_rate_limit_exception(&mut self, ip: IpAddr) {
921        self.rate_limiter.add_exception(ip);
922    }
923
924    pub fn remove_rate_limit_exception(&mut self, ip: IpAddr) {
925        self.rate_limiter.remove_exception(ip);
926    }
927
928    pub fn rate_limit_config(&self) -> TransportRateLimitConfig {
929        let cfg: RateLimiterConfigSnapshot = self.rate_limiter.config_snapshot();
930        TransportRateLimitConfig {
931            per_ip_packet_limit: cfg.per_ip_limit,
932            global_packet_limit: cfg.global_limit,
933            rate_window: cfg.window,
934            block_duration: cfg.block_duration,
935        }
936    }
937
938    pub fn processing_budget_config(&self) -> TransportProcessingBudgetConfig {
939        let cfg = self.rate_limiter.processing_budget_config();
940        TransportProcessingBudgetConfig {
941            enabled: cfg.enabled,
942            per_ip_refill_units_per_sec: cfg.per_ip_refill_units_per_sec,
943            per_ip_burst_units: cfg.per_ip_burst_units,
944            global_refill_units_per_sec: cfg.global_refill_units_per_sec,
945            global_burst_units: cfg.global_burst_units,
946            bucket_idle_ttl: cfg.bucket_idle_ttl,
947        }
948    }
949
950    pub fn set_processing_budget_config(&mut self, config: TransportProcessingBudgetConfig) {
951        let raw = ProcessingBudgetConfig {
952            enabled: config.enabled,
953            per_ip_refill_units_per_sec: config.per_ip_refill_units_per_sec,
954            per_ip_burst_units: config.per_ip_burst_units,
955            global_refill_units_per_sec: config.global_refill_units_per_sec,
956            global_burst_units: config.global_burst_units,
957            bucket_idle_ttl: config.bucket_idle_ttl,
958        };
959        self.rate_limiter.set_processing_budget_config(raw);
960        self.config.processing_budget = self.rate_limiter.processing_budget_config();
961    }
962
963    pub fn set_rate_limit_config(&mut self, config: TransportRateLimitConfig) {
964        self.rate_limiter.update_limits(
965            config.per_ip_packet_limit,
966            config.global_packet_limit,
967            config.rate_window,
968            config.block_duration,
969        );
970        let effective = self.rate_limit_config();
971        self.config.per_ip_packet_limit = effective.per_ip_packet_limit;
972        self.config.global_packet_limit = effective.global_packet_limit;
973        self.config.rate_window = effective.rate_window;
974        self.config.block_duration = effective.block_duration;
975    }
976
977    pub fn set_per_ip_packet_limit(&mut self, limit: usize) {
978        self.rate_limiter.set_per_ip_limit(limit);
979        self.config.per_ip_packet_limit = self.rate_limit_config().per_ip_packet_limit;
980    }
981
982    pub fn set_global_packet_limit(&mut self, limit: usize) {
983        self.rate_limiter.set_global_limit(limit);
984        self.config.global_packet_limit = self.rate_limit_config().global_packet_limit;
985    }
986
987    pub fn set_rate_window(&mut self, window: Duration) {
988        self.rate_limiter.set_window(window);
989        self.config.rate_window = self.rate_limit_config().rate_window;
990    }
991
992    pub fn set_block_duration(&mut self, block_duration: Duration) {
993        self.rate_limiter.set_block_duration(block_duration);
994        self.config.block_duration = self.rate_limit_config().block_duration;
995    }
996
997    pub fn block_address(&mut self, ip: IpAddr) -> bool {
998        self.rate_limiter.block_address(ip)
999    }
1000
1001    pub fn block_address_for(&mut self, ip: IpAddr, duration: Duration) -> bool {
1002        self.rate_limiter
1003            .block_address_for(ip, Instant::now(), duration)
1004    }
1005
1006    pub fn unblock_address(&mut self, ip: IpAddr) -> bool {
1007        self.rate_limiter.unblock_address(ip)
1008    }
1009
1010    pub fn disconnect_peer(&mut self, addr: SocketAddr) -> bool {
1011        let exists =
1012            self.sessions.contains_key(&addr) || self.pending_handshakes.contains_key(&addr);
1013        if exists {
1014            self.local_requested_disconnects = self.local_requested_disconnects.saturating_add(1);
1015            self.close_session(addr);
1016        }
1017        exists
1018    }
1019
1020    pub fn metrics_snapshot(&self) -> TransportMetricsSnapshot {
1021        let mut total = TransportMetricsSnapshot {
1022            session_count: self.sessions.len(),
1023            sessions_started_total: self.sessions_started_total,
1024            sessions_closed_total: self.sessions_closed_total,
1025            packets_forwarded_total: self.packets_forwarded_total,
1026            bytes_forwarded_total: self.bytes_forwarded_total,
1027            illegal_state_transitions: self.illegal_state_transitions,
1028            timed_out_sessions: self.timed_out_sessions,
1029            local_requested_disconnects: self.local_requested_disconnects,
1030            remote_disconnect_notifications: self.remote_disconnect_notifications,
1031            remote_detect_lost_disconnects: self.remote_detect_lost_disconnects,
1032            keepalive_pings_sent: self.keepalive_pings_sent,
1033            cookie_rotations: self.cookie_rotations,
1034            cookie_mismatch_drops: self.cookie_mismatch_drops,
1035            cookie_mismatch_blocks: self.cookie_mismatch_blocks,
1036            handshake_stage_cancel_drops: self.handshake_stage_cancel_drops,
1037            handshake_req1_req2_timeouts: self.handshake_req1_req2_timeouts,
1038            handshake_reply2_connect_timeouts: self.handshake_reply2_connect_timeouts,
1039            handshake_missing_req1_drops: self.handshake_missing_req1_drops,
1040            handshake_auto_blocks: self.handshake_auto_blocks,
1041            handshake_already_connected_rejects: self.handshake_already_connected_rejects,
1042            handshake_ip_recently_connected_rejects: self.handshake_ip_recently_connected_rejects,
1043            request2_server_addr_mismatch_drops: self.request2_server_addr_mismatch_drops,
1044            request2_legacy_parse_hits: self.request2_legacy_parse_hits,
1045            request2_legacy_drops: self.request2_legacy_drops,
1046            request2_ambiguous_parse_hits: self.request2_ambiguous_parse_hits,
1047            request2_ambiguous_drops: self.request2_ambiguous_drops,
1048            proxy_inbound_reroutes: self.proxy_inbound_reroutes,
1049            proxy_inbound_drops: self.proxy_inbound_drops,
1050            proxy_outbound_reroutes: self.proxy_outbound_reroutes,
1051            proxy_outbound_drops: self.proxy_outbound_drops,
1052            ..TransportMetricsSnapshot::default()
1053        };
1054        let mut srtt_sum = 0.0;
1055        let mut rttvar_sum = 0.0;
1056        let mut resend_rto_sum = 0.0;
1057        let mut cwnd_sum = 0.0;
1058
1059        for session in self.sessions.values() {
1060            let s: SessionMetricsSnapshot = session.metrics_snapshot();
1061            total.pending_outgoing_frames += s.pending_outgoing_frames;
1062            total.pending_outgoing_bytes += s.pending_outgoing_bytes;
1063            total.ingress_datagrams = total.ingress_datagrams.saturating_add(s.ingress_datagrams);
1064            total.ingress_frames = total.ingress_frames.saturating_add(s.ingress_frames);
1065            total.duplicate_reliable_drops = total
1066                .duplicate_reliable_drops
1067                .saturating_add(s.duplicate_reliable_drops);
1068            total.ordered_stale_drops = total
1069                .ordered_stale_drops
1070                .saturating_add(s.ordered_stale_drops);
1071            total.ordered_buffer_full_drops = total
1072                .ordered_buffer_full_drops
1073                .saturating_add(s.ordered_buffer_full_drops);
1074            total.sequenced_stale_drops = total
1075                .sequenced_stale_drops
1076                .saturating_add(s.sequenced_stale_drops);
1077            total.sequenced_missing_index_drops = total
1078                .sequenced_missing_index_drops
1079                .saturating_add(s.sequenced_missing_index_drops);
1080            total.reliable_sent_datagrams = total
1081                .reliable_sent_datagrams
1082                .saturating_add(s.reliable_sent_datagrams);
1083            total.resent_datagrams = total.resent_datagrams.saturating_add(s.resent_datagrams);
1084            total.ack_out_total = total.ack_out_total.saturating_add(s.ack_out_datagrams);
1085            total.nack_out_total = total.nack_out_total.saturating_add(s.nack_out_datagrams);
1086            total.acked_datagrams = total.acked_datagrams.saturating_add(s.acked_datagrams);
1087            total.nacked_datagrams = total.nacked_datagrams.saturating_add(s.nacked_datagrams);
1088            total.split_ttl_drops = total.split_ttl_drops.saturating_add(s.split_ttl_drops);
1089            total.outgoing_queue_drops = total
1090                .outgoing_queue_drops
1091                .saturating_add(s.outgoing_queue_drops);
1092            total.outgoing_queue_defers = total
1093                .outgoing_queue_defers
1094                .saturating_add(s.outgoing_queue_defers);
1095            total.outgoing_queue_disconnects = total
1096                .outgoing_queue_disconnects
1097                .saturating_add(s.outgoing_queue_disconnects);
1098            total.backpressure_delays = total
1099                .backpressure_delays
1100                .saturating_add(s.backpressure_delays);
1101            total.backpressure_drops = total
1102                .backpressure_drops
1103                .saturating_add(s.backpressure_drops);
1104            total.backpressure_disconnects = total
1105                .backpressure_disconnects
1106                .saturating_add(s.backpressure_disconnects);
1107
1108            srtt_sum += s.srtt_ms;
1109            rttvar_sum += s.rttvar_ms;
1110            resend_rto_sum += s.resend_rto_ms;
1111            cwnd_sum += s.congestion_window_packets;
1112        }
1113
1114        for pipeline in self.session_pipelines.values() {
1115            let p: SessionPipelineMetricsSnapshot = pipeline.metrics_snapshot();
1116            total.pending_unhandled_frames += p.pending_unhandled_frames;
1117            total.pending_unhandled_bytes += p.pending_unhandled_bytes;
1118            total.unhandled_frames_queued = total
1119                .unhandled_frames_queued
1120                .saturating_add(p.unhandled_frames_queued);
1121            total.unhandled_frames_flushed = total
1122                .unhandled_frames_flushed
1123                .saturating_add(p.unhandled_frames_flushed);
1124            total.unhandled_frames_dropped = total
1125                .unhandled_frames_dropped
1126                .saturating_add(p.unhandled_frames_dropped);
1127        }
1128
1129        let r: RateLimiterMetricsSnapshot = self.rate_limiter.metrics_snapshot();
1130        let p: ProcessingBudgetMetricsSnapshot =
1131            self.rate_limiter.processing_budget_metrics_snapshot();
1132        total.rate_global_limit_hits = r.global_limit_hits;
1133        total.rate_ip_block_hits = r.ip_block_hits;
1134        total.rate_ip_block_hits_rate_exceeded = r.ip_block_hits_rate_exceeded;
1135        total.rate_ip_block_hits_manual = r.ip_block_hits_manual;
1136        total.rate_ip_block_hits_handshake_heuristic = r.ip_block_hits_handshake_heuristic;
1137        total.rate_ip_block_hits_cookie_mismatch_guard = r.ip_block_hits_cookie_mismatch_guard;
1138        total.rate_addresses_blocked = r.addresses_blocked;
1139        total.rate_addresses_blocked_rate_exceeded = r.addresses_blocked_rate_exceeded;
1140        total.rate_addresses_blocked_manual = r.addresses_blocked_manual;
1141        total.rate_addresses_blocked_handshake_heuristic = r.addresses_blocked_handshake_heuristic;
1142        total.rate_addresses_blocked_cookie_mismatch_guard =
1143            r.addresses_blocked_cookie_mismatch_guard;
1144        total.rate_addresses_unblocked = r.addresses_unblocked;
1145        total.rate_blocked_addresses = r.blocked_addresses;
1146        total.rate_exception_addresses = r.exception_addresses;
1147        total.processing_budget_drops_total = p.drops_total;
1148        total.processing_budget_drops_ip_exhausted_total = p.drops_ip_exhausted;
1149        total.processing_budget_drops_global_exhausted_total = p.drops_global_exhausted;
1150        total.processing_budget_consumed_units_total = p.consumed_units_total;
1151        total.processing_budget_active_ip_buckets = p.active_ip_buckets;
1152
1153        if total.session_count > 0 {
1154            let denom = total.session_count as f64;
1155            total.avg_srtt_ms = srtt_sum / denom;
1156            total.avg_rttvar_ms = rttvar_sum / denom;
1157            total.avg_resend_rto_ms = resend_rto_sum / denom;
1158            total.avg_congestion_window_packets = cwnd_sum / denom;
1159        }
1160
1161        total.resend_ratio = if total.reliable_sent_datagrams == 0 {
1162            0.0
1163        } else {
1164            total.resent_datagrams as f64 / total.reliable_sent_datagrams as f64
1165        };
1166
1167        total
1168    }
1169
1170    pub async fn flush_resends(
1171        &mut self,
1172        max_per_session: usize,
1173        max_bytes_per_session: usize,
1174    ) -> io::Result<usize> {
1175        self.tick_outbound(0, 0, max_per_session, max_bytes_per_session)
1176            .await
1177    }
1178
1179    pub fn queue_payload(
1180        &mut self,
1181        addr: SocketAddr,
1182        payload: Bytes,
1183        reliability: Reliability,
1184        channel: u8,
1185        priority: RakPriority,
1186    ) -> QueueDispatchResult {
1187        self.queue_payload_with_optional_receipt(
1188            addr,
1189            payload,
1190            reliability,
1191            channel,
1192            priority,
1193            None,
1194        )
1195    }
1196
1197    pub fn queue_payload_with_receipt(
1198        &mut self,
1199        addr: SocketAddr,
1200        payload: Bytes,
1201        reliability: Reliability,
1202        channel: u8,
1203        priority: RakPriority,
1204        receipt_id: u64,
1205    ) -> QueueDispatchResult {
1206        self.queue_payload_with_optional_receipt(
1207            addr,
1208            payload,
1209            reliability,
1210            channel,
1211            priority,
1212            Some(receipt_id),
1213        )
1214    }
1215
1216    fn queue_payload_with_optional_receipt(
1217        &mut self,
1218        addr: SocketAddr,
1219        payload: Bytes,
1220        reliability: Reliability,
1221        channel: u8,
1222        priority: RakPriority,
1223        receipt_id: Option<u64>,
1224    ) -> QueueDispatchResult {
1225        let Some(session) = self.sessions.get_mut(&addr) else {
1226            return QueueDispatchResult::MissingSession;
1227        };
1228        let decision =
1229            session.queue_payload_with_receipt(payload, reliability, channel, priority, receipt_id);
1230        let disconnect = matches!(decision, QueuePayloadResult::DisconnectRequested)
1231            || session.take_backpressure_disconnect();
1232        if disconnect {
1233            self.close_session(addr);
1234            return QueueDispatchResult::Disconnected;
1235        }
1236
1237        match decision {
1238            QueuePayloadResult::Enqueued { reliable_bytes } => {
1239                QueueDispatchResult::Enqueued { reliable_bytes }
1240            }
1241            QueuePayloadResult::Dropped => QueueDispatchResult::Dropped,
1242            QueuePayloadResult::Deferred => QueueDispatchResult::Deferred,
1243            QueuePayloadResult::DisconnectRequested => QueueDispatchResult::Disconnected,
1244        }
1245    }
1246
1247    pub async fn tick_outbound(
1248        &mut self,
1249        max_new_datagrams_per_session: usize,
1250        max_new_bytes_per_session: usize,
1251        max_resend_datagrams_per_session: usize,
1252        max_resend_bytes_per_session: usize,
1253    ) -> io::Result<usize> {
1254        let now = Instant::now();
1255        self.prune_pending_handshakes(now);
1256        self.prune_cookie_mismatch_guard_states(now);
1257        self.prune_ip_recently_connected(now);
1258        self.prune_idle_sessions(now, None);
1259        self.queue_keepalive_pings(now);
1260        self.rate_limiter.tick(now);
1261        self.rotate_cookie_keys_if_needed(now);
1262
1263        let mut pending = Vec::new();
1264
1265        for (addr, session) in &mut self.sessions {
1266            let datagrams = session.on_tick(
1267                now,
1268                max_new_datagrams_per_session,
1269                max_new_bytes_per_session,
1270                max_resend_datagrams_per_session,
1271                max_resend_bytes_per_session,
1272            );
1273            for d in datagrams {
1274                pending.push((*addr, d));
1275            }
1276        }
1277
1278        for (addr, datagram) in &pending {
1279            self.send_datagram(*addr, datagram).await?;
1280        }
1281
1282        Ok(pending.len())
1283    }
1284
1285    pub async fn send_datagram(&mut self, addr: SocketAddr, datagram: &Datagram) -> io::Result<()> {
1286        let Some(target_addr) = self.route_outbound_target(addr) else {
1287            return Ok(());
1288        };
1289        let mut out = BytesMut::with_capacity(datagram.encoded_size());
1290        datagram.encode(&mut out).map_err(invalid_data_io_error)?;
1291        let _written = self.socket.send_to(&out, target_addr).await?;
1292        Ok(())
1293    }
1294
1295    async fn send_offline_packet(
1296        &mut self,
1297        addr: SocketAddr,
1298        packet: &OfflinePacket,
1299    ) -> io::Result<()> {
1300        let Some(target_addr) = self.route_outbound_target(addr) else {
1301            return Ok(());
1302        };
1303        let mut out = BytesMut::new();
1304        packet.encode(&mut out).map_err(invalid_data_io_error)?;
1305        let _written = self.socket.send_to(&out, target_addr).await?;
1306        Ok(())
1307    }
1308
1309    async fn send_no_free_incoming_connections(&mut self, addr: SocketAddr) -> io::Result<()> {
1310        let packet = OfflinePacket::NoFreeIncomingConnections(NoFreeIncomingConnections {
1311            server_guid: self.config.server_guid,
1312            magic: self.config.unconnected_magic,
1313        });
1314        self.send_offline_packet(addr, &packet).await
1315    }
1316
1317    async fn send_connection_banned(&mut self, addr: SocketAddr) -> io::Result<()> {
1318        let packet = OfflinePacket::ConnectionBanned(ConnectionBanned {
1319            server_guid: self.config.server_guid,
1320            magic: self.config.unconnected_magic,
1321        });
1322        self.send_offline_packet(addr, &packet).await
1323    }
1324
1325    async fn send_already_connected(&mut self, addr: SocketAddr) -> io::Result<()> {
1326        let packet = OfflinePacket::AlreadyConnected(AlreadyConnected {
1327            server_guid: self.config.server_guid,
1328            magic: self.config.unconnected_magic,
1329        });
1330        self.send_offline_packet(addr, &packet).await
1331    }
1332
1333    async fn send_ip_recently_connected(&mut self, addr: SocketAddr) -> io::Result<()> {
1334        let packet = OfflinePacket::IpRecentlyConnected(IpRecentlyConnected {
1335            server_guid: self.config.server_guid,
1336            magic: self.config.unconnected_magic,
1337        });
1338        self.send_offline_packet(addr, &packet).await
1339    }
1340
1341    fn route_outbound_target(&mut self, addr: SocketAddr) -> Option<SocketAddr> {
1342        let Some(router) = self.proxy_router.as_ref() else {
1343            return Some(addr);
1344        };
1345
1346        let local_addr = self.local_addr().unwrap_or(self.config.bind_addr);
1347        match router.route_outbound(addr, local_addr) {
1348            OutboundProxyRoute::Send { target_addr } => {
1349                if target_addr != addr {
1350                    self.proxy_outbound_reroutes = self.proxy_outbound_reroutes.saturating_add(1);
1351                }
1352                Some(target_addr)
1353            }
1354            OutboundProxyRoute::Drop => {
1355                self.proxy_outbound_drops = self.proxy_outbound_drops.saturating_add(1);
1356                None
1357            }
1358        }
1359    }
1360
1361    async fn handle_offline_packet(
1362        &mut self,
1363        addr: SocketAddr,
1364        packet: &OfflinePacket,
1365        now: Instant,
1366    ) -> io::Result<Option<TransportEvent>> {
1367        match packet {
1368            OfflinePacket::UnconnectedPing(ping)
1369            | OfflinePacket::UnconnectedPingOpenConnections(ping) => {
1370                let pong = OfflinePacket::UnconnectedPong(UnconnectedPong {
1371                    ping_time: ping.ping_time,
1372                    server_guid: self.config.server_guid,
1373                    magic: self.config.unconnected_magic,
1374                    motd: self.advertisement_bytes.clone(),
1375                });
1376                self.send_offline_packet(addr, &pong).await?;
1377            }
1378            OfflinePacket::OpenConnectionRequest1(req1) => {
1379                if !supports_protocol(&self.config.supported_protocols, req1.protocol_version) {
1380                    warn!(
1381                        %addr,
1382                        protocol_version = req1.protocol_version,
1383                        "rejecting request1: incompatible protocol version"
1384                    );
1385                    let incompatible =
1386                        OfflinePacket::IncompatibleProtocolVersion(IncompatibleProtocolVersion {
1387                            protocol_version: primary_protocol_version(
1388                                &self.config.supported_protocols,
1389                            ),
1390                            server_guid: self.config.server_guid,
1391                            magic: self.config.unconnected_magic,
1392                        });
1393                    self.send_offline_packet(addr, &incompatible).await?;
1394                    return Ok(None);
1395                }
1396
1397                if self.is_ip_recently_connected(addr, now) {
1398                    self.handshake_ip_recently_connected_rejects = self
1399                        .handshake_ip_recently_connected_rejects
1400                        .saturating_add(1);
1401                    warn!(%addr, "rejecting request1: ip recently connected");
1402                    self.send_ip_recently_connected(addr).await?;
1403                    return Ok(None);
1404                }
1405
1406                if self.has_active_session_for_offline_reject(addr) {
1407                    self.handshake_already_connected_rejects =
1408                        self.handshake_already_connected_rejects.saturating_add(1);
1409                    warn!(%addr, "rejecting request1: already connected");
1410                    self.send_already_connected(addr).await?;
1411                    return Ok(None);
1412                }
1413
1414                if self.pending_handshakes.get(&addr).is_some_and(|pending| {
1415                    pending.stage == PendingHandshakeStage::AwaitingConnectionRequest
1416                }) {
1417                    self.handshake_already_connected_rejects =
1418                        self.handshake_already_connected_rejects.saturating_add(1);
1419                    warn!(%addr, "rejecting request1: handshake already in progress");
1420                    self.send_already_connected(addr).await?;
1421                    return Ok(None);
1422                }
1423
1424                if self.would_exceed_session_limit(addr) {
1425                    warn!(%addr, "rejecting request1: session limit reached");
1426                    self.send_no_free_incoming_connections(addr).await?;
1427                    return Ok(Some(TransportEvent::SessionLimitReached { addr }));
1428                }
1429
1430                let mtu = self.negotiate_mtu(req1.mtu);
1431                let previous_cookie = self.pending_handshakes.get(&addr).and_then(|pending| {
1432                    if pending.stage == PendingHandshakeStage::AwaitingRequest2 {
1433                        pending.cookie
1434                    } else {
1435                        None
1436                    }
1437                });
1438                let cookie = if self.config.send_cookie {
1439                    previous_cookie.or_else(|| Some(self.generate_cookie(addr)))
1440                } else {
1441                    None
1442                };
1443
1444                self.pending_handshakes.insert(
1445                    addr,
1446                    PendingHandshake {
1447                        mtu,
1448                        cookie,
1449                        client_guid: None,
1450                        stage: PendingHandshakeStage::AwaitingRequest2,
1451                        expires_at: now + self.config.handshake_req1_req2_timeout(),
1452                    },
1453                );
1454
1455                self.sessions.remove(&addr);
1456                self.session_pipelines.remove(&addr);
1457                self.sessions.insert(
1458                    addr,
1459                    Session::with_tunables(mtu as usize, self.config.session_tunables.clone()),
1460                );
1461                let mut valid_transition = true;
1462                if let Some(session) = self.sessions.get_mut(&addr) {
1463                    valid_transition = Self::apply_session_transitions(
1464                        session,
1465                        &[SessionState::Req1Recv, SessionState::Reply1Sent],
1466                    );
1467                }
1468                if !valid_transition {
1469                    self.record_illegal_state_transition(addr);
1470                    return Ok(None);
1471                }
1472
1473                let reply = OfflinePacket::OpenConnectionReply1(OpenConnectionReply1 {
1474                    server_guid: self.config.server_guid,
1475                    mtu,
1476                    cookie,
1477                    magic: self.config.unconnected_magic,
1478                });
1479                self.send_offline_packet(addr, &reply).await?;
1480            }
1481            OfflinePacket::OpenConnectionRequest2(req2) => {
1482                if self.is_ip_recently_connected(addr, now) {
1483                    self.handshake_ip_recently_connected_rejects = self
1484                        .handshake_ip_recently_connected_rejects
1485                        .saturating_add(1);
1486                    warn!(%addr, "rejecting request2: ip recently connected");
1487                    self.send_ip_recently_connected(addr).await?;
1488                    return Ok(None);
1489                }
1490
1491                if self.has_active_session_for_offline_reject(addr) {
1492                    self.handshake_already_connected_rejects =
1493                        self.handshake_already_connected_rejects.saturating_add(1);
1494                    warn!(%addr, "rejecting request2: already connected");
1495                    self.send_already_connected(addr).await?;
1496                    return Ok(None);
1497                }
1498
1499                match req2.parse_path {
1500                    Request2ParsePath::LegacyHeuristic => {
1501                        self.request2_legacy_parse_hits =
1502                            self.request2_legacy_parse_hits.saturating_add(1);
1503                        if !self.config.allow_legacy_request2_fallback {
1504                            self.request2_legacy_drops =
1505                                self.request2_legacy_drops.saturating_add(1);
1506                            self.handshake_stage_cancel_drops =
1507                                self.handshake_stage_cancel_drops.saturating_add(1);
1508                            warn!(%addr, "dropping request2: legacy parse path disallowed");
1509                            let newly_blocked = self.record_handshake_violation(
1510                                addr,
1511                                HandshakeViolation::ParseAnomalyDrop,
1512                                now,
1513                            );
1514                            if newly_blocked {
1515                                self.send_connection_banned(addr).await?;
1516                            }
1517                            return Ok(None);
1518                        }
1519                    }
1520                    Request2ParsePath::AmbiguousPreferredNoCookie
1521                    | Request2ParsePath::AmbiguousPreferredWithCookie => {
1522                        self.request2_ambiguous_parse_hits =
1523                            self.request2_ambiguous_parse_hits.saturating_add(1);
1524                        if self.config.reject_ambiguous_request2 {
1525                            self.request2_ambiguous_drops =
1526                                self.request2_ambiguous_drops.saturating_add(1);
1527                            self.handshake_stage_cancel_drops =
1528                                self.handshake_stage_cancel_drops.saturating_add(1);
1529                            warn!(%addr, "dropping request2: ambiguous parse path rejected");
1530                            let newly_blocked = self.record_handshake_violation(
1531                                addr,
1532                                HandshakeViolation::ParseAnomalyDrop,
1533                                now,
1534                            );
1535                            if newly_blocked {
1536                                self.send_connection_banned(addr).await?;
1537                            }
1538                            return Ok(None);
1539                        }
1540                    }
1541                    Request2ParsePath::StrictNoCookie | Request2ParsePath::StrictWithCookie => {}
1542                }
1543
1544                let local_server_addr = self.local_addr().unwrap_or(self.config.bind_addr);
1545                if !self.is_request2_server_addr_allowed(req2.server_addr, local_server_addr) {
1546                    self.request2_server_addr_mismatch_drops =
1547                        self.request2_server_addr_mismatch_drops.saturating_add(1);
1548                    self.handshake_stage_cancel_drops =
1549                        self.handshake_stage_cancel_drops.saturating_add(1);
1550                    warn!(
1551                        %addr,
1552                        request_server_addr = %req2.server_addr,
1553                        local_server_addr = %local_server_addr,
1554                        "dropping request2: server_addr mismatch"
1555                    );
1556                    return Ok(None);
1557                }
1558
1559                let Some(pending) = self.pending_handshakes.get(&addr).copied() else {
1560                    self.handshake_missing_req1_drops =
1561                        self.handshake_missing_req1_drops.saturating_add(1);
1562                    self.handshake_stage_cancel_drops =
1563                        self.handshake_stage_cancel_drops.saturating_add(1);
1564                    warn!(%addr, "dropping request2: missing pending request1");
1565                    let newly_blocked = self.record_handshake_violation(
1566                        addr,
1567                        HandshakeViolation::MissingPendingReq1,
1568                        now,
1569                    );
1570                    if newly_blocked {
1571                        self.send_connection_banned(addr).await?;
1572                    }
1573                    return Ok(None);
1574                };
1575
1576                if pending.expires_at <= now {
1577                    match pending.stage {
1578                        PendingHandshakeStage::AwaitingRequest2 => {
1579                            self.handshake_req1_req2_timeouts =
1580                                self.handshake_req1_req2_timeouts.saturating_add(1);
1581                        }
1582                        PendingHandshakeStage::AwaitingConnectionRequest => {
1583                            self.handshake_reply2_connect_timeouts =
1584                                self.handshake_reply2_connect_timeouts.saturating_add(1);
1585                        }
1586                    }
1587                    self.handshake_stage_cancel_drops =
1588                        self.handshake_stage_cancel_drops.saturating_add(1);
1589                    warn!(
1590                        %addr,
1591                        stage = ?pending.stage,
1592                        "dropping request2: pending handshake timed out"
1593                    );
1594                    let violation = match pending.stage {
1595                        PendingHandshakeStage::AwaitingRequest2 => {
1596                            HandshakeViolation::Req1Req2Timeout
1597                        }
1598                        PendingHandshakeStage::AwaitingConnectionRequest => {
1599                            HandshakeViolation::Reply2ConnectTimeout
1600                        }
1601                    };
1602                    let newly_blocked = self.record_handshake_violation(addr, violation, now);
1603                    self.close_session(addr);
1604                    if newly_blocked {
1605                        self.send_connection_banned(addr).await?;
1606                    }
1607                    return Ok(None);
1608                }
1609
1610                if self.config.send_cookie
1611                    && (pending.cookie.is_none() || !self.verify_cookie(addr, req2.cookie))
1612                {
1613                    self.cookie_mismatch_drops = self.cookie_mismatch_drops.saturating_add(1);
1614                    self.handshake_stage_cancel_drops =
1615                        self.handshake_stage_cancel_drops.saturating_add(1);
1616                    warn!(%addr, "dropping request2: cookie verification failed");
1617                    let blocked_by_guard = self.record_cookie_mismatch(addr.ip(), now);
1618                    let blocked_by_heuristics = self.record_handshake_violation(
1619                        addr,
1620                        HandshakeViolation::CookieMismatch,
1621                        now,
1622                    );
1623                    if blocked_by_guard || blocked_by_heuristics {
1624                        self.send_connection_banned(addr).await?;
1625                    }
1626                    return Ok(None);
1627                }
1628                self.clear_cookie_mismatch_state(addr.ip());
1629
1630                if pending.stage == PendingHandshakeStage::AwaitingConnectionRequest {
1631                    let retry_mtu = self.negotiate_mtu(req2.mtu.min(pending.mtu));
1632                    let Some(session) = self.sessions.get_mut(&addr) else {
1633                        self.record_illegal_state_transition(addr);
1634                        return Ok(None);
1635                    };
1636                    if !is_post_reply2_handshake_state(session.state()) {
1637                        self.record_illegal_state_transition(addr);
1638                        return Ok(None);
1639                    }
1640                    session.set_mtu(retry_mtu as usize);
1641                    session.touch_activity(now);
1642
1643                    let reply = OfflinePacket::OpenConnectionReply2(OpenConnectionReply2 {
1644                        server_guid: self.config.server_guid,
1645                        server_addr: local_server_addr,
1646                        mtu: retry_mtu,
1647                        use_encryption: false,
1648                        magic: self.config.unconnected_magic,
1649                    });
1650                    self.send_offline_packet(addr, &reply).await?;
1651                    return Ok(None);
1652                }
1653
1654                if self.would_exceed_session_limit(addr) {
1655                    warn!(%addr, "rejecting request2: session limit reached");
1656                    self.send_no_free_incoming_connections(addr).await?;
1657                    return Ok(Some(TransportEvent::SessionLimitReached { addr }));
1658                }
1659
1660                let mtu = self.negotiate_mtu(req2.mtu.min(pending.mtu));
1661                let Some(session) = self.sessions.get_mut(&addr) else {
1662                    self.record_illegal_state_transition(addr);
1663                    return Ok(None);
1664                };
1665                session.set_mtu(mtu as usize);
1666                session.touch_activity(now);
1667                if !Self::apply_session_transitions(
1668                    session,
1669                    &[SessionState::Req2Recv, SessionState::Reply2Sent],
1670                ) {
1671                    self.record_illegal_state_transition(addr);
1672                    return Ok(None);
1673                }
1674                self.pending_handshakes.insert(
1675                    addr,
1676                    PendingHandshake {
1677                        mtu,
1678                        cookie: pending.cookie,
1679                        client_guid: Some(req2.client_guid),
1680                        stage: PendingHandshakeStage::AwaitingConnectionRequest,
1681                        expires_at: now + self.config.handshake_reply2_connect_timeout(),
1682                    },
1683                );
1684                let reply = OfflinePacket::OpenConnectionReply2(OpenConnectionReply2 {
1685                    server_guid: self.config.server_guid,
1686                    server_addr: local_server_addr,
1687                    mtu,
1688                    use_encryption: false,
1689                    magic: self.config.unconnected_magic,
1690                });
1691                self.send_offline_packet(addr, &reply).await?;
1692            }
1693            _ => {}
1694        }
1695
1696        Ok(None)
1697    }
1698
1699    fn apply_connected_control(
1700        session: &mut Session,
1701        addr: SocketAddr,
1702        server_addr: SocketAddr,
1703        now_millis: i64,
1704        packet: ConnectedControlPacket,
1705    ) -> Result<ControlAction, DecodeError> {
1706        match packet {
1707            ConnectedControlPacket::ConnectedPing(ping) => {
1708                if session.state() != SessionState::Connected {
1709                    return Ok(ControlAction::CloseSession {
1710                        remote_reason: None,
1711                        illegal_state: true,
1712                    });
1713                }
1714                let pong = ConnectedControlPacket::ConnectedPong(ConnectedPong {
1715                    ping_time: ping.ping_time,
1716                    pong_time: now_millis,
1717                });
1718                let queued = Self::queue_connected_control_packet(
1719                    session,
1720                    pong,
1721                    Reliability::Unreliable,
1722                    0,
1723                    RakPriority::Immediate,
1724                )?;
1725                if matches!(queued, QueuePayloadResult::DisconnectRequested)
1726                    || session.take_backpressure_disconnect()
1727                {
1728                    return Ok(ControlAction::CloseSession {
1729                        remote_reason: None,
1730                        illegal_state: false,
1731                    });
1732                }
1733            }
1734            ConnectedControlPacket::ConnectionRequest(request) => {
1735                if !Self::apply_session_transitions(session, &[SessionState::ConnReqRecv]) {
1736                    return Ok(ControlAction::CloseSession {
1737                        remote_reason: None,
1738                        illegal_state: true,
1739                    });
1740                }
1741
1742                let accepted =
1743                    ConnectedControlPacket::ConnectionRequestAccepted(ConnectionRequestAccepted {
1744                        client_addr: addr,
1745                        system_index: 0,
1746                        internal_addrs: build_internal_addrs(server_addr),
1747                        request_time: request.request_time,
1748                        accepted_time: now_millis,
1749                    });
1750
1751                let queued = Self::queue_connected_control_packet(
1752                    session,
1753                    accepted,
1754                    Reliability::ReliableOrdered,
1755                    0,
1756                    RakPriority::High,
1757                )?;
1758                if !matches!(queued, QueuePayloadResult::Enqueued { .. }) {
1759                    return Ok(ControlAction::CloseSession {
1760                        remote_reason: None,
1761                        illegal_state: false,
1762                    });
1763                }
1764                if session.take_backpressure_disconnect() {
1765                    return Ok(ControlAction::CloseSession {
1766                        remote_reason: None,
1767                        illegal_state: false,
1768                    });
1769                }
1770
1771                if !Self::apply_session_transitions(session, &[SessionState::ConnReqAcceptedSent]) {
1772                    return Ok(ControlAction::CloseSession {
1773                        remote_reason: None,
1774                        illegal_state: true,
1775                    });
1776                }
1777            }
1778            ConnectedControlPacket::NewIncomingConnection(_) => {
1779                if !Self::apply_session_transitions(
1780                    session,
1781                    &[SessionState::NewIncomingRecv, SessionState::Connected],
1782                ) {
1783                    return Ok(ControlAction::CloseSession {
1784                        remote_reason: None,
1785                        illegal_state: true,
1786                    });
1787                }
1788            }
1789            ConnectedControlPacket::DisconnectionNotification(pkt) => {
1790                if !Self::apply_session_transitions(
1791                    session,
1792                    &[SessionState::Closing, SessionState::Closed],
1793                ) {
1794                    return Ok(ControlAction::CloseSession {
1795                        remote_reason: None,
1796                        illegal_state: true,
1797                    });
1798                }
1799                return Ok(ControlAction::CloseSession {
1800                    remote_reason: Some(RemoteDisconnectReason::DisconnectionNotification {
1801                        reason_code: pkt.reason,
1802                    }),
1803                    illegal_state: false,
1804                });
1805            }
1806            ConnectedControlPacket::DetectLostConnection(_) => {
1807                if !Self::apply_session_transitions(
1808                    session,
1809                    &[SessionState::Closing, SessionState::Closed],
1810                ) {
1811                    return Ok(ControlAction::CloseSession {
1812                        remote_reason: None,
1813                        illegal_state: true,
1814                    });
1815                }
1816                return Ok(ControlAction::CloseSession {
1817                    remote_reason: Some(RemoteDisconnectReason::DetectLostConnection),
1818                    illegal_state: false,
1819                });
1820            }
1821            ConnectedControlPacket::ConnectedPong(_)
1822            | ConnectedControlPacket::ConnectionRequestAccepted(_) => {}
1823        }
1824
1825        Ok(ControlAction::None)
1826    }
1827
1828    fn apply_session_transitions(session: &mut Session, transitions: &[SessionState]) -> bool {
1829        for &next in transitions {
1830            if !session.transition_to(next) {
1831                return false;
1832            }
1833        }
1834        true
1835    }
1836
1837    fn queue_connected_control_packet(
1838        session: &mut Session,
1839        packet: ConnectedControlPacket,
1840        reliability: Reliability,
1841        channel: u8,
1842        priority: RakPriority,
1843    ) -> Result<QueuePayloadResult, DecodeError> {
1844        let mut bytes = BytesMut::new();
1845        packet
1846            .encode(&mut bytes)
1847            .map_err(|_| DecodeError::UnexpectedEof)?;
1848        let payload = bytes.freeze();
1849        Ok(session.queue_payload(payload, reliability, channel, priority))
1850    }
1851
1852    fn prune_pending_handshakes(&mut self, now: Instant) {
1853        let mut expired = Vec::new();
1854        for (addr, pending) in &self.pending_handshakes {
1855            if pending.expires_at <= now {
1856                expired.push((*addr, *pending));
1857            }
1858        }
1859
1860        for (addr, pending) in expired {
1861            let violation = match pending.stage {
1862                PendingHandshakeStage::AwaitingRequest2 => {
1863                    self.handshake_req1_req2_timeouts =
1864                        self.handshake_req1_req2_timeouts.saturating_add(1);
1865                    HandshakeViolation::Req1Req2Timeout
1866                }
1867                PendingHandshakeStage::AwaitingConnectionRequest => {
1868                    self.handshake_reply2_connect_timeouts =
1869                        self.handshake_reply2_connect_timeouts.saturating_add(1);
1870                    HandshakeViolation::Reply2ConnectTimeout
1871                }
1872            };
1873            warn!(
1874                %addr,
1875                stage = ?pending.stage,
1876                "pending handshake expired and session is being closed"
1877            );
1878            self.handshake_stage_cancel_drops = self.handshake_stage_cancel_drops.saturating_add(1);
1879            let _ = self.record_handshake_violation(addr, violation, now);
1880            self.close_session(addr);
1881        }
1882
1883        self.prune_handshake_heuristic_states(now);
1884    }
1885
1886    fn clear_cookie_mismatch_state(&mut self, ip: IpAddr) {
1887        self.cookie_mismatch_guard_states.remove(&ip);
1888    }
1889
1890    fn record_cookie_mismatch(&mut self, ip: IpAddr, now: Instant) -> bool {
1891        let guard = self.config.cookie_mismatch_guard;
1892        if !guard.enabled
1893            || guard.event_window.is_zero()
1894            || guard.block_duration.is_zero()
1895            || guard.mismatch_threshold == 0
1896        {
1897            return false;
1898        }
1899
1900        let state =
1901            self.cookie_mismatch_guard_states
1902                .entry(ip)
1903                .or_insert(CookieMismatchGuardState {
1904                    window_started_at: now,
1905                    mismatches: 0,
1906                });
1907
1908        if now.saturating_duration_since(state.window_started_at) > guard.event_window {
1909            state.window_started_at = now;
1910            state.mismatches = 0;
1911        }
1912
1913        state.mismatches = state.mismatches.saturating_add(1);
1914        if state.mismatches < guard.mismatch_threshold {
1915            return false;
1916        }
1917
1918        state.window_started_at = now;
1919        state.mismatches = 0;
1920
1921        let newly_blocked = self.rate_limiter.block_address_for_with_reason(
1922            ip,
1923            now,
1924            guard.block_duration,
1925            BlockReason::CookieMismatchGuard,
1926        );
1927        if newly_blocked {
1928            self.cookie_mismatch_blocks = self.cookie_mismatch_blocks.saturating_add(1);
1929            warn!(
1930                %ip,
1931                mismatches_required = guard.mismatch_threshold,
1932                block_secs = guard.block_duration.as_secs(),
1933                "cookie mismatch guard blocked address"
1934            );
1935        }
1936        newly_blocked
1937    }
1938
1939    fn prune_cookie_mismatch_guard_states(&mut self, now: Instant) {
1940        let guard = self.config.cookie_mismatch_guard;
1941        if !guard.enabled || guard.event_window.is_zero() {
1942            self.cookie_mismatch_guard_states.clear();
1943            return;
1944        }
1945
1946        self.cookie_mismatch_guard_states.retain(|_, state| {
1947            now.saturating_duration_since(state.window_started_at) <= guard.event_window
1948        });
1949    }
1950
1951    fn record_handshake_violation(
1952        &mut self,
1953        addr: SocketAddr,
1954        violation: HandshakeViolation,
1955        now: Instant,
1956    ) -> bool {
1957        let h = self.config.handshake_heuristics;
1958        if !h.enabled
1959            || h.score_threshold == 0
1960            || h.event_window.is_zero()
1961            || h.block_duration.is_zero()
1962        {
1963            return false;
1964        }
1965
1966        let points = violation.score(&self.config);
1967        if points == 0 {
1968            return false;
1969        }
1970        debug!(%addr, ?violation, points, "recorded handshake violation");
1971
1972        let mut should_block = false;
1973        {
1974            let entry =
1975                self.handshake_heuristics
1976                    .entry(addr.ip())
1977                    .or_insert(HandshakeHeuristicState {
1978                        window_started_at: now,
1979                        score: 0,
1980                    });
1981
1982            if now.saturating_duration_since(entry.window_started_at) > h.event_window {
1983                entry.window_started_at = now;
1984                entry.score = 0;
1985            }
1986
1987            entry.score = entry.score.saturating_add(points);
1988            if entry.score >= h.score_threshold {
1989                entry.window_started_at = now;
1990                entry.score = 0;
1991                should_block = true;
1992            }
1993        }
1994
1995        if !should_block {
1996            return false;
1997        }
1998
1999        let newly_blocked = self.rate_limiter.block_address_for_with_reason(
2000            addr.ip(),
2001            now,
2002            h.block_duration,
2003            BlockReason::HandshakeHeuristic,
2004        );
2005        if newly_blocked {
2006            self.handshake_auto_blocks = self.handshake_auto_blocks.saturating_add(1);
2007            warn!(
2008                %addr,
2009                ?violation,
2010                block_secs = h.block_duration.as_secs(),
2011                "handshake heuristic blocked address"
2012            );
2013        }
2014        newly_blocked
2015    }
2016
2017    fn prune_handshake_heuristic_states(&mut self, now: Instant) {
2018        let h = self.config.handshake_heuristics;
2019        if !h.enabled || h.event_window.is_zero() {
2020            self.handshake_heuristics.clear();
2021            return;
2022        }
2023
2024        self.handshake_heuristics.retain(|_, state| {
2025            now.saturating_duration_since(state.window_started_at) <= h.event_window
2026        });
2027    }
2028
2029    fn close_session(&mut self, addr: SocketAddr) {
2030        self.pending_handshakes.remove(&addr);
2031        if let Some(session) = self.sessions.remove(&addr) {
2032            let state = session.state();
2033            if should_count_closed_session(state) {
2034                self.sessions_closed_total = self.sessions_closed_total.saturating_add(1);
2035            }
2036            if should_mark_ip_recently_connected(state) {
2037                self.mark_ip_recently_connected(addr, Instant::now());
2038            }
2039        }
2040        self.session_pipelines.remove(&addr);
2041    }
2042
2043    fn has_active_session_for_offline_reject(&self, addr: SocketAddr) -> bool {
2044        let Some(session) = self.sessions.get(&addr) else {
2045            return false;
2046        };
2047        if self.pending_handshakes.contains_key(&addr) {
2048            return false;
2049        }
2050        matches!(
2051            session.state(),
2052            SessionState::Req2Recv
2053                | SessionState::Reply2Sent
2054                | SessionState::ConnReqRecv
2055                | SessionState::ConnReqAcceptedSent
2056                | SessionState::NewIncomingRecv
2057                | SessionState::Connected
2058                | SessionState::Closing
2059                | SessionState::Closed
2060        )
2061    }
2062
2063    fn is_request2_server_addr_allowed(
2064        &self,
2065        request_server_addr: SocketAddr,
2066        local_server_addr: SocketAddr,
2067    ) -> bool {
2068        match self.config.request2_server_addr_policy {
2069            Request2ServerAddrPolicy::Disabled => true,
2070            Request2ServerAddrPolicy::PortOnly => {
2071                request_server_addr.port() == local_server_addr.port()
2072            }
2073            Request2ServerAddrPolicy::Exact => {
2074                if local_server_addr.ip().is_unspecified() {
2075                    request_server_addr.port() == local_server_addr.port()
2076                } else {
2077                    request_server_addr == local_server_addr
2078                }
2079            }
2080        }
2081    }
2082
2083    fn is_ip_recently_connected(&mut self, addr: SocketAddr, now: Instant) -> bool {
2084        self.prune_ip_recently_connected(now);
2085        self.ip_recently_connected_until
2086            .get(&addr.ip())
2087            .is_some_and(|until| *until > now)
2088    }
2089
2090    fn mark_ip_recently_connected(&mut self, addr: SocketAddr, now: Instant) {
2091        let window = self.config.ip_recently_connected_window;
2092        if window.is_zero() {
2093            return;
2094        }
2095        self.ip_recently_connected_until
2096            .insert(addr.ip(), now + window);
2097    }
2098
2099    fn prune_ip_recently_connected(&mut self, now: Instant) {
2100        if self.config.ip_recently_connected_window.is_zero() {
2101            self.ip_recently_connected_until.clear();
2102            return;
2103        }
2104        self.ip_recently_connected_until
2105            .retain(|_, until| *until > now);
2106    }
2107
2108    fn record_remote_disconnect(&mut self, reason: RemoteDisconnectReason) {
2109        match reason {
2110            RemoteDisconnectReason::DisconnectionNotification { .. } => {
2111                self.remote_disconnect_notifications =
2112                    self.remote_disconnect_notifications.saturating_add(1);
2113            }
2114            RemoteDisconnectReason::DetectLostConnection => {
2115                self.remote_detect_lost_disconnects =
2116                    self.remote_detect_lost_disconnects.saturating_add(1);
2117            }
2118        }
2119    }
2120
2121    fn prune_idle_sessions(&mut self, now: Instant, protected_addr: Option<SocketAddr>) {
2122        let timeout = self.config.session_idle_timeout;
2123        if timeout.is_zero() {
2124            return;
2125        }
2126
2127        let mut stale = Vec::new();
2128        for (addr, session) in &self.sessions {
2129            if Some(*addr) == protected_addr {
2130                continue;
2131            }
2132            if session.state() != SessionState::Connected {
2133                continue;
2134            }
2135            if session.idle_for(now) >= timeout {
2136                stale.push(*addr);
2137            }
2138        }
2139
2140        for addr in stale {
2141            self.timed_out_sessions = self.timed_out_sessions.saturating_add(1);
2142            self.close_session(addr);
2143        }
2144    }
2145
2146    fn queue_keepalive_pings(&mut self, now: Instant) {
2147        let interval = self.config.session_keepalive_interval;
2148        if interval.is_zero() {
2149            return;
2150        }
2151
2152        let mut close_addrs = Vec::new();
2153        for (addr, session) in &mut self.sessions {
2154            if !session.should_send_keepalive(now, interval) {
2155                continue;
2156            }
2157
2158            let ping = ConnectedControlPacket::ConnectedPing(ConnectedPing {
2159                ping_time: unix_timestamp_millis(),
2160            });
2161
2162            if matches!(
2163                Self::queue_connected_control_packet(
2164                    session,
2165                    ping,
2166                    Reliability::Unreliable,
2167                    0,
2168                    RakPriority::Low,
2169                ),
2170                Ok(QueuePayloadResult::Enqueued { .. })
2171            ) {
2172                session.mark_keepalive_sent(now);
2173                self.keepalive_pings_sent = self.keepalive_pings_sent.saturating_add(1);
2174            }
2175
2176            if session.take_backpressure_disconnect() {
2177                close_addrs.push(*addr);
2178            }
2179        }
2180
2181        for addr in close_addrs {
2182            self.close_session(addr);
2183        }
2184    }
2185
2186    fn record_illegal_state_transition(&mut self, addr: SocketAddr) {
2187        self.illegal_state_transitions = self.illegal_state_transitions.saturating_add(1);
2188        self.close_session(addr);
2189    }
2190
2191    fn would_exceed_session_limit(&self, addr: SocketAddr) -> bool {
2192        !self.sessions.contains_key(&addr) && self.sessions.len() >= self.config.max_sessions
2193    }
2194
2195    fn negotiate_mtu(&self, requested: u16) -> u16 {
2196        let server_cap =
2197            self.config
2198                .mtu
2199                .clamp(MINIMUM_MTU_SIZE as usize, MAXIMUM_MTU_SIZE as usize) as u16;
2200        requested.clamp(MINIMUM_MTU_SIZE, server_cap)
2201    }
2202
2203    fn rotate_cookie_keys_if_needed(&mut self, now: Instant) {
2204        if !self.config.send_cookie || self.config.cookie_rotation_interval.is_zero() {
2205            return;
2206        }
2207        if now < self.next_cookie_rotation {
2208            return;
2209        }
2210
2211        let next_key = SecretCookieKey::new(random_cookie_key());
2212        let previous_key = std::mem::replace(&mut self.cookie_key_current, next_key);
2213        self.cookie_key_previous = Some(previous_key);
2214        self.next_cookie_rotation = now + self.config.cookie_rotation_interval;
2215        self.cookie_rotations = self.cookie_rotations.saturating_add(1);
2216    }
2217
2218    fn generate_cookie(&self, addr: SocketAddr) -> u32 {
2219        self.compute_cookie_for_key(addr, self.cookie_key_current.as_ref())
2220    }
2221
2222    fn verify_cookie(&self, addr: SocketAddr, cookie: Option<u32>) -> bool {
2223        let Some(cookie) = cookie else {
2224            return false;
2225        };
2226
2227        if self.compute_cookie_for_key(addr, self.cookie_key_current.as_ref()) == cookie {
2228            return true;
2229        }
2230
2231        if let Some(previous_key) = self.cookie_key_previous.as_ref()
2232            && self.compute_cookie_for_key(addr, previous_key.as_ref()) == cookie
2233        {
2234            return true;
2235        }
2236
2237        false
2238    }
2239
2240    fn compute_cookie_for_key(&self, addr: SocketAddr, key: &[u8]) -> u32 {
2241        let mut mac = HmacSha256::new_from_slice(key).expect("HMAC supports arbitrary key lengths");
2242        update_mac_with_socket_addr(&mut mac, addr);
2243        update_mac_with_socket_addr(&mut mac, self.config.bind_addr);
2244        mac.update(&self.config.server_guid.to_le_bytes());
2245        let tag = mac.finalize().into_bytes();
2246        u32::from_le_bytes([tag[0], tag[1], tag[2], tag[3]])
2247    }
2248}
2249
2250fn invalid_data_io_error<E: std::fmt::Display>(error: E) -> io::Error {
2251    io::Error::new(io::ErrorKind::InvalidData, error.to_string())
2252}
2253
2254fn supports_protocol(supported: &[u8], incoming: u8) -> bool {
2255    if supported.is_empty() {
2256        return incoming == RAKNET_PROTOCOL_VERSION;
2257    }
2258    supported.contains(&incoming)
2259}
2260
2261fn primary_protocol_version(supported: &[u8]) -> u8 {
2262    supported
2263        .iter()
2264        .copied()
2265        .max()
2266        .unwrap_or(RAKNET_PROTOCOL_VERSION)
2267}
2268
2269fn random_cookie_key() -> [u8; COOKIE_KEY_LEN] {
2270    let mut key = [0u8; COOKIE_KEY_LEN];
2271    if getrandom::getrandom(&mut key).is_ok() {
2272        return key;
2273    }
2274
2275    let fallback = unix_timestamp_millis() as u64;
2276    for (idx, chunk) in key.chunks_exact_mut(8).enumerate() {
2277        let seed = fallback
2278            .wrapping_mul((idx as u64).saturating_add(1))
2279            .rotate_left((idx as u32).saturating_mul(11));
2280        chunk.copy_from_slice(&seed.to_le_bytes());
2281    }
2282    key
2283}
2284
2285fn invalid_config_io_error(error: crate::error::ConfigValidationError) -> io::Error {
2286    io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
2287}
2288
2289#[cfg(any(target_os = "linux", target_os = "android"))]
2290fn set_df_path_mtu_discovery_v4(socket: &Socket) -> io::Result<()> {
2291    let mode: libc::c_int = libc::IP_PMTUDISC_DO;
2292    let rc = unsafe {
2293        libc::setsockopt(
2294            socket.as_raw_fd(),
2295            libc::IPPROTO_IP,
2296            libc::IP_MTU_DISCOVER,
2297            &mode as *const libc::c_int as *const libc::c_void,
2298            mem::size_of::<libc::c_int>() as libc::socklen_t,
2299        )
2300    };
2301    if rc == 0 {
2302        return Ok(());
2303    }
2304    Err(io::Error::last_os_error())
2305}
2306
2307#[cfg(not(any(target_os = "linux", target_os = "android")))]
2308fn set_df_path_mtu_discovery_v4(_socket: &Socket) -> io::Result<()> {
2309    Err(io::Error::new(
2310        io::ErrorKind::Unsupported,
2311        "disable_ip_fragmentation is unsupported for IPv4 on this platform",
2312    ))
2313}
2314
2315#[cfg(any(target_os = "linux", target_os = "android"))]
2316fn set_df_path_mtu_discovery_v6(socket: &Socket) -> io::Result<()> {
2317    let mode: libc::c_int = libc::IPV6_PMTUDISC_DO;
2318    let rc = unsafe {
2319        libc::setsockopt(
2320            socket.as_raw_fd(),
2321            libc::IPPROTO_IPV6,
2322            libc::IPV6_MTU_DISCOVER,
2323            &mode as *const libc::c_int as *const libc::c_void,
2324            mem::size_of::<libc::c_int>() as libc::socklen_t,
2325        )
2326    };
2327    if rc == 0 {
2328        return Ok(());
2329    }
2330    Err(io::Error::last_os_error())
2331}
2332
2333#[cfg(not(any(target_os = "linux", target_os = "android")))]
2334fn set_df_path_mtu_discovery_v6(_socket: &Socket) -> io::Result<()> {
2335    Err(io::Error::new(
2336        io::ErrorKind::Unsupported,
2337        "disable_ip_fragmentation is unsupported for IPv6 on this platform",
2338    ))
2339}
2340
2341fn update_mac_with_socket_addr(mac: &mut HmacSha256, addr: SocketAddr) {
2342    match addr {
2343        SocketAddr::V4(v4) => {
2344            mac.update(&[4]);
2345            mac.update(&v4.ip().octets());
2346            mac.update(&v4.port().to_le_bytes());
2347        }
2348        SocketAddr::V6(v6) => {
2349            mac.update(&[6]);
2350            mac.update(&v6.ip().octets());
2351            mac.update(&v6.port().to_le_bytes());
2352            mac.update(&v6.flowinfo().to_le_bytes());
2353            mac.update(&v6.scope_id().to_le_bytes());
2354        }
2355    }
2356}
2357
2358fn unix_timestamp_millis() -> i64 {
2359    match SystemTime::now().duration_since(UNIX_EPOCH) {
2360        Ok(duration) => duration.as_millis().min(i64::MAX as u128) as i64,
2361        Err(_) => 0,
2362    }
2363}
2364
2365fn build_internal_addrs(server_addr: SocketAddr) -> [SocketAddr; SYSTEM_ADDRESS_COUNT] {
2366    let fallback = match server_addr {
2367        SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
2368        SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
2369            Ipv6Addr::UNSPECIFIED,
2370            0,
2371            0,
2372            v6.scope_id(),
2373        )),
2374    };
2375
2376    let mut addrs = [fallback; SYSTEM_ADDRESS_COUNT];
2377    addrs[0] = server_addr;
2378    addrs
2379}
2380
2381fn estimate_connected_datagram_processing_cost(raw_len: usize, datagram: &Datagram) -> usize {
2382    match &datagram.payload {
2383        DatagramPayload::Ack(payload) | DatagramPayload::Nack(payload) => {
2384            payload.ranges.len().saturating_mul(8).max(1)
2385        }
2386        DatagramPayload::Frames(frames) => {
2387            let mut cost = raw_len.max(1);
2388            cost = cost.saturating_add(frames.len().saturating_mul(48));
2389
2390            for frame in frames {
2391                let payload_len = frame.payload.len();
2392                cost = cost.saturating_add(payload_len);
2393
2394                if frame.header.reliability.is_reliable() {
2395                    cost = cost.saturating_add(32);
2396                }
2397                if frame.header.reliability.is_ordered() || frame.header.reliability.is_sequenced()
2398                {
2399                    cost = cost.saturating_add(24);
2400                }
2401                if frame.header.is_split {
2402                    let split_parts = frame
2403                        .split
2404                        .as_ref()
2405                        .map(|s| s.part_count as usize)
2406                        .unwrap_or(1);
2407                    cost = cost.saturating_add(512);
2408                    cost = cost.saturating_add(payload_len.saturating_mul(2));
2409                    cost = cost.saturating_add(split_parts.min(2_048));
2410                }
2411            }
2412
2413            cost.max(1)
2414        }
2415    }
2416}
2417
2418fn is_offline_packet_id(id: u8) -> bool {
2419    matches!(
2420        id,
2421        0x01 | 0x02 | 0x05 | 0x06 | 0x07 | 0x08 | 0x11 | 0x12 | 0x14 | 0x17 | 0x19 | 0x1A | 0x1C
2422    )
2423}
2424
2425fn is_connected_control_id(id: u8) -> bool {
2426    matches!(id, 0x00 | 0x03 | 0x04 | 0x09 | 0x10 | 0x13 | 0x15)
2427}
2428
2429fn is_post_reply2_handshake_state(state: SessionState) -> bool {
2430    matches!(
2431        state,
2432        SessionState::Reply2Sent
2433            | SessionState::ConnReqRecv
2434            | SessionState::ConnReqAcceptedSent
2435            | SessionState::NewIncomingRecv
2436    )
2437}
2438
2439fn should_mark_ip_recently_connected(state: SessionState) -> bool {
2440    state == SessionState::Connected
2441        || state == SessionState::Closing
2442        || state == SessionState::Closed
2443        || is_post_reply2_handshake_state(state)
2444}
2445
2446fn should_count_closed_session(state: SessionState) -> bool {
2447    matches!(
2448        state,
2449        SessionState::Connected | SessionState::Closing | SessionState::Closed
2450    )
2451}
2452
2453#[cfg(test)]
2454mod tests {
2455    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6};
2456    use std::time::{Duration, Instant};
2457
2458    use bytes::{Bytes, BytesMut};
2459    use tokio::runtime::Builder;
2460
2461    use super::{
2462        ConnectedFrameDelivery, ControlAction, HandshakeViolation, PendingHandshake,
2463        PendingHandshakeStage, RemoteDisconnectReason, TransportEvent,
2464        TransportProcessingBudgetConfig, TransportRateLimitConfig, TransportServer,
2465        build_internal_addrs, is_connected_control_id, is_offline_packet_id,
2466        primary_protocol_version, supports_protocol,
2467    };
2468    use crate::error::DecodeError;
2469    use crate::handshake::{
2470        OfflinePacket, OpenConnectionRequest1, OpenConnectionRequest2, Request2ParsePath,
2471    };
2472    use crate::protocol::connected::{
2473        ConnectedControlPacket, ConnectedPing, ConnectionRequest, DetectLostConnection,
2474        DisconnectionNotification,
2475    };
2476    use crate::protocol::constants::{
2477        DEFAULT_UNCONNECTED_MAGIC, DatagramFlags, RAKNET_PROTOCOL_VERSION,
2478    };
2479    use crate::protocol::datagram::{Datagram, DatagramHeader, DatagramPayload};
2480    use crate::protocol::frame::Frame;
2481    use crate::protocol::frame_header::FrameHeader;
2482    use crate::protocol::reliability::Reliability;
2483    use crate::protocol::sequence24::Sequence24;
2484    use crate::session::tunables::SessionTunables;
2485    use crate::session::{QueuePayloadResult, RakPriority, Session, SessionState};
2486    use crate::transport::config::{
2487        CookieMismatchGuardConfig, HandshakeHeuristicsConfig, ProcessingBudgetConfig,
2488        Request2ServerAddrPolicy, TransportConfig, TransportSocketTuning,
2489    };
2490
2491    fn build_test_server(mut config: TransportConfig) -> TransportServer {
2492        let rt = Builder::new_current_thread()
2493            .enable_all()
2494            .build()
2495            .expect("runtime must build");
2496
2497        rt.block_on(async move {
2498            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
2499                .await
2500                .expect("bind should succeed");
2501            config.bind_addr = socket.local_addr().expect("local addr should be available");
2502            TransportServer::with_socket(config, socket).expect("server should build")
2503        })
2504    }
2505
2506    #[test]
2507    fn connected_frame_delivery_preserves_payload_and_metadata() {
2508        let payload = Bytes::from_static(b"\xfepayload");
2509        let frame = Frame {
2510            header: FrameHeader {
2511                reliability: Reliability::ReliableOrdered,
2512                is_split: false,
2513                needs_bas: false,
2514            },
2515            bit_length: (payload.len() as u16) << 3,
2516            reliable_index: Some(Sequence24::new(3)),
2517            sequence_index: None,
2518            ordering_index: Some(Sequence24::new(8)),
2519            ordering_channel: Some(2),
2520            split: None,
2521            payload: payload.clone(),
2522        };
2523
2524        let delivered = ConnectedFrameDelivery::from_frame(frame);
2525        assert_eq!(delivered.payload, payload);
2526        assert_eq!(delivered.reliability, Reliability::ReliableOrdered);
2527        assert_eq!(delivered.reliable_index, Some(Sequence24::new(3)));
2528        assert_eq!(delivered.ordering_index, Some(Sequence24::new(8)));
2529        assert_eq!(delivered.ordering_channel, Some(2));
2530        assert!(delivered.sequence_index.is_none());
2531    }
2532
2533    #[test]
2534    fn apply_session_transitions_accepts_valid_path() {
2535        let mut session = Session::new(1492);
2536        let ok = TransportServer::apply_session_transitions(
2537            &mut session,
2538            &[
2539                SessionState::Req1Recv,
2540                SessionState::Reply1Sent,
2541                SessionState::Req2Recv,
2542                SessionState::Reply2Sent,
2543                SessionState::ConnReqRecv,
2544                SessionState::ConnReqAcceptedSent,
2545                SessionState::NewIncomingRecv,
2546                SessionState::Connected,
2547            ],
2548        );
2549        assert!(ok);
2550        assert_eq!(session.state(), SessionState::Connected);
2551    }
2552
2553    #[test]
2554    fn apply_session_transitions_rejects_invalid_jump() {
2555        let mut session = Session::new(1492);
2556        let ok =
2557            TransportServer::apply_session_transitions(&mut session, &[SessionState::ConnReqRecv]);
2558        assert!(!ok);
2559        assert_eq!(session.state(), SessionState::Offline);
2560    }
2561
2562    #[test]
2563    fn connected_ping_before_connected_state_is_rejected() {
2564        let mut session = Session::new(1492);
2565        let result = TransportServer::apply_connected_control(
2566            &mut session,
2567            "127.0.0.1:19132"
2568                .parse::<SocketAddr>()
2569                .expect("valid socket addr"),
2570            "127.0.0.1:19132"
2571                .parse::<SocketAddr>()
2572                .expect("valid socket addr"),
2573            123,
2574            ConnectedControlPacket::ConnectedPing(ConnectedPing { ping_time: 11 }),
2575        );
2576
2577        assert!(matches!(
2578            result,
2579            Ok(ControlAction::CloseSession {
2580                remote_reason: None,
2581                illegal_state: true,
2582            })
2583        ));
2584        assert_eq!(session.state(), SessionState::Offline);
2585    }
2586
2587    #[test]
2588    fn disconnection_notification_exposes_remote_reason_code() {
2589        let mut session = Session::new(1492);
2590        assert!(TransportServer::apply_session_transitions(
2591            &mut session,
2592            &[
2593                SessionState::Req1Recv,
2594                SessionState::Reply1Sent,
2595                SessionState::Req2Recv,
2596                SessionState::Reply2Sent,
2597                SessionState::ConnReqRecv,
2598                SessionState::ConnReqAcceptedSent,
2599                SessionState::NewIncomingRecv,
2600                SessionState::Connected,
2601            ],
2602        ));
2603
2604        let result = TransportServer::apply_connected_control(
2605            &mut session,
2606            "127.0.0.1:19132"
2607                .parse::<SocketAddr>()
2608                .expect("valid socket addr"),
2609            "127.0.0.1:19132"
2610                .parse::<SocketAddr>()
2611                .expect("valid socket addr"),
2612            123,
2613            ConnectedControlPacket::DisconnectionNotification(DisconnectionNotification {
2614                reason: Some(7),
2615            }),
2616        );
2617
2618        assert!(matches!(
2619            result,
2620            Ok(ControlAction::CloseSession {
2621                remote_reason: Some(RemoteDisconnectReason::DisconnectionNotification {
2622                    reason_code: Some(7)
2623                }),
2624                illegal_state: false,
2625            })
2626        ));
2627        assert_eq!(session.state(), SessionState::Closed);
2628    }
2629
2630    #[test]
2631    fn detect_lost_connection_is_reported_as_remote_disconnect() {
2632        let mut session = Session::new(1492);
2633        assert!(TransportServer::apply_session_transitions(
2634            &mut session,
2635            &[
2636                SessionState::Req1Recv,
2637                SessionState::Reply1Sent,
2638                SessionState::Req2Recv,
2639                SessionState::Reply2Sent,
2640                SessionState::ConnReqRecv,
2641                SessionState::ConnReqAcceptedSent,
2642                SessionState::NewIncomingRecv,
2643                SessionState::Connected,
2644            ],
2645        ));
2646
2647        let result = TransportServer::apply_connected_control(
2648            &mut session,
2649            "127.0.0.1:19132"
2650                .parse::<SocketAddr>()
2651                .expect("valid socket addr"),
2652            "127.0.0.1:19132"
2653                .parse::<SocketAddr>()
2654                .expect("valid socket addr"),
2655            123,
2656            ConnectedControlPacket::DetectLostConnection(DetectLostConnection),
2657        );
2658
2659        assert!(matches!(
2660            result,
2661            Ok(ControlAction::CloseSession {
2662                remote_reason: Some(RemoteDisconnectReason::DetectLostConnection),
2663                illegal_state: false,
2664            })
2665        ));
2666        assert_eq!(session.state(), SessionState::Closed);
2667    }
2668
2669    #[test]
2670    fn connection_request_after_reply2_is_accepted() {
2671        let mut session = Session::new(1492);
2672        assert!(TransportServer::apply_session_transitions(
2673            &mut session,
2674            &[
2675                SessionState::Req1Recv,
2676                SessionState::Reply1Sent,
2677                SessionState::Req2Recv,
2678                SessionState::Reply2Sent,
2679            ],
2680        ));
2681
2682        let result = TransportServer::apply_connected_control(
2683            &mut session,
2684            "127.0.0.1:19132"
2685                .parse::<SocketAddr>()
2686                .expect("valid socket addr"),
2687            "127.0.0.1:19132"
2688                .parse::<SocketAddr>()
2689                .expect("valid socket addr"),
2690            123,
2691            ConnectedControlPacket::ConnectionRequest(ConnectionRequest {
2692                client_guid: 42,
2693                request_time: 77,
2694                use_encryption: false,
2695            }),
2696        );
2697
2698        assert!(matches!(result, Ok(ControlAction::None)));
2699        assert_eq!(session.state(), SessionState::ConnReqAcceptedSent);
2700    }
2701
2702    #[test]
2703    fn protocol_support_accepts_configured_versions() {
2704        let versions = [10, 11, 12];
2705        assert!(supports_protocol(&versions, 10));
2706        assert!(supports_protocol(&versions, 12));
2707        assert!(!supports_protocol(&versions, 9));
2708        assert!(supports_protocol(&[], RAKNET_PROTOCOL_VERSION));
2709        assert!(!supports_protocol(
2710            &[],
2711            RAKNET_PROTOCOL_VERSION.saturating_sub(1)
2712        ));
2713    }
2714
2715    #[test]
2716    fn primary_protocol_version_uses_highest_configured_version() {
2717        let versions = [11, 13, 12];
2718        assert_eq!(primary_protocol_version(&versions), 13);
2719        assert_eq!(primary_protocol_version(&[]), RAKNET_PROTOCOL_VERSION);
2720    }
2721
2722    #[test]
2723    fn cookie_rotation_keeps_previous_key_valid_temporarily() {
2724        let config = TransportConfig {
2725            send_cookie: true,
2726            cookie_rotation_interval: Duration::from_secs(1),
2727            ..TransportConfig::default()
2728        };
2729        let mut server = build_test_server(config);
2730        let addr = "127.0.0.1:19132"
2731            .parse::<SocketAddr>()
2732            .expect("valid socket addr");
2733
2734        let mut original_key = [0u8; super::COOKIE_KEY_LEN];
2735        original_key.copy_from_slice(server.cookie_key_current.as_ref());
2736        let old_cookie = server.generate_cookie(addr);
2737        server.rotate_cookie_keys_if_needed(Instant::now() + Duration::from_secs(2));
2738
2739        assert_eq!(server.cookie_rotations, 1);
2740        let previous_key = server.cookie_key_previous.as_ref().map(|key| {
2741            let mut value = [0u8; super::COOKIE_KEY_LEN];
2742            value.copy_from_slice(key.as_ref());
2743            value
2744        });
2745        assert_eq!(
2746            previous_key,
2747            Some(original_key),
2748            "rotating cookies must retain previous key for grace period"
2749        );
2750        assert!(server.verify_cookie(addr, Some(old_cookie)));
2751    }
2752
2753    #[test]
2754    fn cookie_is_bound_to_socket_address() {
2755        let config = TransportConfig {
2756            send_cookie: true,
2757            ..TransportConfig::default()
2758        };
2759        let server = build_test_server(config);
2760        let addr_a = "127.0.0.1:19132"
2761            .parse::<SocketAddr>()
2762            .expect("valid socket addr");
2763        let addr_b = "127.0.0.2:19132"
2764            .parse::<SocketAddr>()
2765            .expect("valid socket addr");
2766
2767        let cookie = server.generate_cookie(addr_a);
2768        assert!(server.verify_cookie(addr_a, Some(cookie)));
2769        assert!(!server.verify_cookie(addr_b, Some(cookie)));
2770    }
2771
2772    #[test]
2773    fn cookie_mismatch_guard_blocks_after_threshold() {
2774        let config = TransportConfig {
2775            cookie_mismatch_guard: CookieMismatchGuardConfig {
2776                enabled: true,
2777                event_window: Duration::from_secs(30),
2778                mismatch_threshold: 2,
2779                block_duration: Duration::from_secs(10),
2780            },
2781            ..TransportConfig::default()
2782        };
2783        let mut server = build_test_server(config);
2784        let ip: IpAddr = "203.0.113.9".parse().expect("valid ip");
2785        let now = Instant::now();
2786
2787        assert!(!server.record_cookie_mismatch(ip, now));
2788        assert!(server.record_cookie_mismatch(ip, now + Duration::from_millis(1)));
2789
2790        let metrics = server.metrics_snapshot();
2791        assert_eq!(metrics.cookie_mismatch_blocks, 1);
2792        assert_eq!(metrics.rate_blocked_addresses, 1);
2793    }
2794
2795    #[test]
2796    fn request2_cookie_mismatch_updates_metrics_and_uses_guard_blocking() {
2797        let rt = Builder::new_current_thread()
2798            .enable_all()
2799            .build()
2800            .expect("runtime must build");
2801        rt.block_on(async {
2802            let mut config = TransportConfig {
2803                send_cookie: true,
2804                handshake_heuristics: HandshakeHeuristicsConfig {
2805                    enabled: false,
2806                    ..HandshakeHeuristicsConfig::default()
2807                },
2808                cookie_mismatch_guard: CookieMismatchGuardConfig {
2809                    enabled: true,
2810                    event_window: Duration::from_secs(30),
2811                    mismatch_threshold: 1,
2812                    block_duration: Duration::from_secs(10),
2813                },
2814                ..TransportConfig::default()
2815            };
2816            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
2817                .await
2818                .expect("bind should succeed");
2819            config.bind_addr = socket.local_addr().expect("local addr should be available");
2820            let mut server =
2821                TransportServer::with_socket(config, socket).expect("server should build");
2822            let addr = "127.0.0.1:20320"
2823                .parse::<SocketAddr>()
2824                .expect("valid socket addr");
2825
2826            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
2827                protocol_version: RAKNET_PROTOCOL_VERSION,
2828                mtu: 1200,
2829                magic: DEFAULT_UNCONNECTED_MAGIC,
2830            });
2831            server
2832                .handle_offline_packet(addr, &request1, Instant::now())
2833                .await
2834                .expect("request1 handling should succeed");
2835
2836            let wrong_cookie = server.generate_cookie(addr).wrapping_add(1);
2837            let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
2838                server_addr: server.local_addr().expect("local addr should be available"),
2839                mtu: 1200,
2840                client_guid: 0xABCD_ABCD_ABCD_ABCD,
2841                cookie: Some(wrong_cookie),
2842                client_proof: true,
2843                parse_path: Request2ParsePath::StrictWithCookie,
2844                magic: DEFAULT_UNCONNECTED_MAGIC,
2845            });
2846            let result = server
2847                .handle_offline_packet(addr, &request2, Instant::now())
2848                .await
2849                .expect("request2 handling should succeed");
2850            assert!(result.is_none());
2851
2852            let metrics = server.metrics_snapshot();
2853            assert_eq!(metrics.cookie_mismatch_drops, 1);
2854            assert_eq!(metrics.cookie_mismatch_blocks, 1);
2855            assert_eq!(metrics.rate_blocked_addresses, 1);
2856            assert_eq!(metrics.rate_addresses_blocked_cookie_mismatch_guard, 1);
2857        });
2858    }
2859
2860    #[test]
2861    fn pending_handshake_timeout_updates_metrics_and_closes_session() {
2862        let mut server = build_test_server(TransportConfig::default());
2863        let addr = "127.0.0.1:20001"
2864            .parse::<SocketAddr>()
2865            .expect("valid socket addr");
2866        let now = Instant::now();
2867
2868        let mut session = Session::new(1492);
2869        assert!(TransportServer::apply_session_transitions(
2870            &mut session,
2871            &[SessionState::Req1Recv, SessionState::Reply1Sent]
2872        ));
2873        server.sessions.insert(addr, session);
2874        server.pending_handshakes.insert(
2875            addr,
2876            PendingHandshake {
2877                mtu: 1492,
2878                cookie: None,
2879                client_guid: None,
2880                stage: PendingHandshakeStage::AwaitingRequest2,
2881                expires_at: now - Duration::from_millis(1),
2882            },
2883        );
2884
2885        server.prune_pending_handshakes(now);
2886
2887        assert!(!server.sessions.contains_key(&addr));
2888        assert!(!server.pending_handshakes.contains_key(&addr));
2889
2890        let metrics = server.metrics_snapshot();
2891        assert_eq!(metrics.handshake_req1_req2_timeouts, 1);
2892        assert_eq!(metrics.handshake_stage_cancel_drops, 1);
2893    }
2894
2895    #[test]
2896    fn reply2_connect_timeout_updates_metrics_and_closes_session() {
2897        let config = TransportConfig {
2898            handshake_reply2_connect_timeout: Duration::from_secs(1),
2899            ..TransportConfig::default()
2900        };
2901        let mut server = build_test_server(config);
2902        let addr = "127.0.0.1:20002"
2903            .parse::<SocketAddr>()
2904            .expect("valid socket addr");
2905        let now = Instant::now();
2906
2907        let mut session = Session::new(1492);
2908        assert!(TransportServer::apply_session_transitions(
2909            &mut session,
2910            &[
2911                SessionState::Req1Recv,
2912                SessionState::Reply1Sent,
2913                SessionState::Req2Recv,
2914                SessionState::Reply2Sent,
2915            ]
2916        ));
2917        session.touch_activity(now - Duration::from_secs(2));
2918        server.sessions.insert(addr, session);
2919        server.pending_handshakes.insert(
2920            addr,
2921            PendingHandshake {
2922                mtu: 1492,
2923                cookie: None,
2924                client_guid: None,
2925                stage: PendingHandshakeStage::AwaitingConnectionRequest,
2926                expires_at: now - Duration::from_millis(1),
2927            },
2928        );
2929
2930        server.prune_pending_handshakes(now);
2931
2932        assert!(!server.sessions.contains_key(&addr));
2933        let metrics = server.metrics_snapshot();
2934        assert_eq!(metrics.handshake_reply2_connect_timeouts, 1);
2935        assert_eq!(metrics.handshake_stage_cancel_drops, 1);
2936    }
2937
2938    #[test]
2939    fn handshake_heuristic_blocks_after_threshold() {
2940        let config = TransportConfig {
2941            handshake_heuristics: HandshakeHeuristicsConfig {
2942                enabled: true,
2943                event_window: Duration::from_secs(30),
2944                block_duration: Duration::from_secs(10),
2945                score_threshold: 3,
2946                req1_req2_timeout_score: 1,
2947                reply2_connect_timeout_score: 1,
2948                missing_req1_score: 1,
2949                cookie_mismatch_score: 2,
2950                parse_anomaly_score: 1,
2951            },
2952            ..TransportConfig::default()
2953        };
2954        let mut server = build_test_server(config);
2955        let addr = "127.0.0.1:20003"
2956            .parse::<SocketAddr>()
2957            .expect("valid socket addr");
2958        let now = Instant::now();
2959
2960        let blocked_1 =
2961            server.record_handshake_violation(addr, HandshakeViolation::CookieMismatch, now);
2962        let blocked_2 = server.record_handshake_violation(
2963            addr,
2964            HandshakeViolation::CookieMismatch,
2965            now + Duration::from_millis(1),
2966        );
2967
2968        assert!(!blocked_1);
2969        assert!(blocked_2);
2970
2971        let metrics = server.metrics_snapshot();
2972        assert_eq!(metrics.handshake_auto_blocks, 1);
2973        assert_eq!(metrics.rate_blocked_addresses, 1);
2974        assert_eq!(metrics.rate_addresses_blocked_handshake_heuristic, 1);
2975    }
2976
2977    #[test]
2978    fn packet_id_classification_and_internal_addresses_are_consistent() {
2979        assert!(is_offline_packet_id(0x05));
2980        assert!(!is_offline_packet_id(0x03));
2981        assert!(is_connected_control_id(0x03));
2982        assert!(!is_connected_control_id(0x05));
2983
2984        let server_addr = "10.1.2.3:19132"
2985            .parse::<SocketAddr>()
2986            .expect("valid socket addr");
2987        let internal = build_internal_addrs(server_addr);
2988        assert_eq!(internal[0], server_addr);
2989        for addr in internal.iter().skip(1) {
2990            assert!(addr.ip().is_unspecified());
2991            assert_eq!(addr.port(), 0);
2992        }
2993    }
2994
2995    #[test]
2996    fn open_connection_request1_creates_session_with_transport_tunables() {
2997        let rt = Builder::new_current_thread()
2998            .enable_all()
2999            .build()
3000            .expect("runtime must build");
3001        rt.block_on(async {
3002            let mut config = TransportConfig {
3003                session_tunables: SessionTunables {
3004                    outgoing_queue_max_frames: 1,
3005                    outgoing_queue_max_bytes: 128,
3006                    outgoing_queue_soft_ratio: 0.95,
3007                    ..SessionTunables::default()
3008                },
3009                ..TransportConfig::default()
3010            };
3011            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3012                .await
3013                .expect("bind should succeed");
3014            config.bind_addr = socket.local_addr().expect("local addr should be available");
3015            let mut server =
3016                TransportServer::with_socket(config, socket).expect("server should build");
3017            let addr = "127.0.0.1:20123"
3018                .parse::<SocketAddr>()
3019                .expect("valid socket addr");
3020
3021            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3022                protocol_version: RAKNET_PROTOCOL_VERSION,
3023                mtu: 1200,
3024                magic: DEFAULT_UNCONNECTED_MAGIC,
3025            });
3026
3027            let event = server
3028                .handle_offline_packet(addr, &request1, Instant::now())
3029                .await
3030                .expect("offline request1 handling should succeed");
3031            assert!(
3032                event.is_none(),
3033                "request1 should not emit a transport event in healthy path"
3034            );
3035
3036            let session = server
3037                .sessions
3038                .get_mut(&addr)
3039                .expect("request1 must create a pending session");
3040            let first = session.queue_payload(
3041                Bytes::from_static(b"\xFEfirst"),
3042                Reliability::Unreliable,
3043                0,
3044                RakPriority::Normal,
3045            );
3046            let second = session.queue_payload(
3047                Bytes::from_static(b"\xFEsecond"),
3048                Reliability::Unreliable,
3049                0,
3050                RakPriority::Normal,
3051            );
3052
3053            assert!(
3054                matches!(first, QueuePayloadResult::Enqueued { .. }),
3055                "first packet should fit in queue"
3056            );
3057            assert_eq!(
3058                second,
3059                QueuePayloadResult::Dropped,
3060                "second packet should be dropped because configured queue max frames=1"
3061            );
3062        });
3063    }
3064
3065    #[test]
3066    fn process_next_event_uses_configured_unconnected_magic() {
3067        let rt = Builder::new_current_thread()
3068            .enable_all()
3069            .build()
3070            .expect("runtime must build");
3071
3072        rt.block_on(async {
3073            let custom_magic = [
3074                0x13, 0x57, 0x9B, 0xDF, 0x24, 0x68, 0xAC, 0xF0, 0x10, 0x32, 0x54, 0x76, 0x98, 0xBA,
3075                0xDC, 0xFE,
3076            ];
3077            let mut config = TransportConfig {
3078                unconnected_magic: custom_magic,
3079                ..TransportConfig::default()
3080            };
3081
3082            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3083                .await
3084                .expect("bind should succeed");
3085            config.bind_addr = socket.local_addr().expect("local addr should be available");
3086            let server_addr = config.bind_addr;
3087            let mut server =
3088                TransportServer::with_socket(config, socket).expect("server should build");
3089
3090            let client = tokio::net::UdpSocket::bind("127.0.0.1:0")
3091                .await
3092                .expect("client bind should succeed");
3093
3094            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3095                protocol_version: RAKNET_PROTOCOL_VERSION,
3096                mtu: 1200,
3097                magic: custom_magic,
3098            });
3099            let mut buf = BytesMut::new();
3100            request1.encode(&mut buf).expect("encode must succeed");
3101            client
3102                .send_to(&buf, server_addr)
3103                .await
3104                .expect("send should succeed");
3105
3106            let event = tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3107                .await
3108                .expect("server should receive packet in time")
3109                .expect("processing should succeed");
3110            assert!(
3111                matches!(event, TransportEvent::OfflinePacket { .. }),
3112                "matching custom magic should parse as offline packet event"
3113            );
3114        });
3115    }
3116
3117    #[test]
3118    fn process_next_event_rejects_mismatched_unconnected_magic() {
3119        let rt = Builder::new_current_thread()
3120            .enable_all()
3121            .build()
3122            .expect("runtime must build");
3123
3124        rt.block_on(async {
3125            let custom_magic = [
3126                0x13, 0x57, 0x9B, 0xDF, 0x24, 0x68, 0xAC, 0xF0, 0x10, 0x32, 0x54, 0x76, 0x98, 0xBA,
3127                0xDC, 0xFE,
3128            ];
3129            let mut config = TransportConfig {
3130                unconnected_magic: custom_magic,
3131                ..TransportConfig::default()
3132            };
3133
3134            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3135                .await
3136                .expect("bind should succeed");
3137            config.bind_addr = socket.local_addr().expect("local addr should be available");
3138            let server_addr = config.bind_addr;
3139            let mut server =
3140                TransportServer::with_socket(config, socket).expect("server should build");
3141
3142            let client = tokio::net::UdpSocket::bind("127.0.0.1:0")
3143                .await
3144                .expect("client bind should succeed");
3145
3146            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3147                protocol_version: RAKNET_PROTOCOL_VERSION,
3148                mtu: 1200,
3149                magic: DEFAULT_UNCONNECTED_MAGIC,
3150            });
3151            let mut buf = BytesMut::new();
3152            request1.encode(&mut buf).expect("encode must succeed");
3153            client
3154                .send_to(&buf, server_addr)
3155                .await
3156                .expect("send should succeed");
3157
3158            let event = tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3159                .await
3160                .expect("server should receive packet in time")
3161                .expect("processing should succeed");
3162            assert!(
3163                matches!(
3164                    event,
3165                    TransportEvent::DecodeError {
3166                        error: DecodeError::InvalidMagic,
3167                        ..
3168                    }
3169                ),
3170                "mismatched magic should be rejected as invalid magic"
3171            );
3172        });
3173    }
3174
3175    #[test]
3176    fn rate_limit_config_updates_sync_with_transport_config() {
3177        let mut server = build_test_server(TransportConfig::default());
3178        server.set_rate_limit_config(TransportRateLimitConfig {
3179            per_ip_packet_limit: 0,
3180            global_packet_limit: 0,
3181            rate_window: Duration::ZERO,
3182            block_duration: Duration::ZERO,
3183        });
3184
3185        let effective = server.rate_limit_config();
3186        assert_eq!(effective.per_ip_packet_limit, 1);
3187        assert_eq!(effective.global_packet_limit, 1);
3188        assert_eq!(effective.rate_window, Duration::from_millis(1));
3189        assert_eq!(effective.block_duration, Duration::from_millis(1));
3190        assert_eq!(server.config().per_ip_packet_limit, 1);
3191        assert_eq!(server.config().global_packet_limit, 1);
3192        assert_eq!(server.config().rate_window, Duration::from_millis(1));
3193        assert_eq!(server.config().block_duration, Duration::from_millis(1));
3194    }
3195
3196    #[test]
3197    fn permanent_block_api_tracks_metrics_and_requires_manual_unblock() {
3198        let mut server = build_test_server(TransportConfig::default());
3199        let ip: IpAddr = "198.51.100.9".parse().expect("valid ip");
3200
3201        assert!(server.block_address(ip));
3202        assert!(!server.block_address(ip));
3203        let metrics_after_block = server.metrics_snapshot();
3204        assert_eq!(metrics_after_block.rate_blocked_addresses, 1);
3205        assert_eq!(metrics_after_block.rate_addresses_blocked_manual, 1);
3206
3207        assert!(server.unblock_address(ip));
3208        assert_eq!(server.metrics_snapshot().rate_blocked_addresses, 0);
3209    }
3210
3211    #[test]
3212    fn local_and_remote_disconnect_metrics_are_tracked_separately() {
3213        let mut server = build_test_server(TransportConfig::default());
3214        let addr = "127.0.0.1:20100"
3215            .parse::<SocketAddr>()
3216            .expect("valid socket addr");
3217        let mut session = Session::new(1492);
3218        assert!(TransportServer::apply_session_transitions(
3219            &mut session,
3220            &[
3221                SessionState::Req1Recv,
3222                SessionState::Reply1Sent,
3223                SessionState::Req2Recv,
3224                SessionState::Reply2Sent,
3225                SessionState::ConnReqRecv,
3226                SessionState::ConnReqAcceptedSent,
3227                SessionState::NewIncomingRecv,
3228                SessionState::Connected,
3229            ],
3230        ));
3231        server.sessions.insert(addr, session);
3232
3233        assert!(server.disconnect_peer(addr));
3234        server.record_remote_disconnect(RemoteDisconnectReason::DisconnectionNotification {
3235            reason_code: Some(3),
3236        });
3237        server.record_remote_disconnect(RemoteDisconnectReason::DetectLostConnection);
3238
3239        let metrics = server.metrics_snapshot();
3240        assert_eq!(metrics.local_requested_disconnects, 1);
3241        assert_eq!(metrics.remote_disconnect_notifications, 1);
3242        assert_eq!(metrics.remote_detect_lost_disconnects, 1);
3243        assert_eq!(metrics.sessions_closed_total, 1);
3244    }
3245
3246    #[test]
3247    fn processing_budget_config_updates_sync_with_transport_config() {
3248        let mut server = build_test_server(TransportConfig::default());
3249        server.set_processing_budget_config(TransportProcessingBudgetConfig {
3250            enabled: true,
3251            per_ip_refill_units_per_sec: 0,
3252            per_ip_burst_units: 0,
3253            global_refill_units_per_sec: 0,
3254            global_burst_units: 0,
3255            bucket_idle_ttl: Duration::ZERO,
3256        });
3257
3258        let effective = server.processing_budget_config();
3259        assert!(effective.enabled);
3260        assert_eq!(effective.per_ip_refill_units_per_sec, 1);
3261        assert_eq!(effective.per_ip_burst_units, 1);
3262        assert_eq!(effective.global_refill_units_per_sec, 1);
3263        assert_eq!(effective.global_burst_units, 1);
3264        assert_eq!(effective.bucket_idle_ttl, Duration::from_millis(1));
3265        assert_eq!(
3266            server
3267                .config()
3268                .processing_budget
3269                .per_ip_refill_units_per_sec,
3270            1
3271        );
3272        assert_eq!(server.config().processing_budget.per_ip_burst_units, 1);
3273        assert_eq!(
3274            server
3275                .config()
3276                .processing_budget
3277                .global_refill_units_per_sec,
3278            1
3279        );
3280        assert_eq!(server.config().processing_budget.global_burst_units, 1);
3281        assert_eq!(
3282            server.config().processing_budget.bucket_idle_ttl,
3283            Duration::from_millis(1)
3284        );
3285    }
3286
3287    #[test]
3288    fn processing_budget_exhaustion_drops_datagram_without_forcing_disconnect() {
3289        let rt = Builder::new_current_thread()
3290            .enable_all()
3291            .build()
3292            .expect("runtime must build");
3293        rt.block_on(async {
3294            let mut config = TransportConfig {
3295                per_ip_packet_limit: 10_000,
3296                global_packet_limit: 10_000,
3297                processing_budget: ProcessingBudgetConfig {
3298                    enabled: true,
3299                    per_ip_refill_units_per_sec: 1,
3300                    per_ip_burst_units: 1,
3301                    global_refill_units_per_sec: 10_000,
3302                    global_burst_units: 10_000,
3303                    bucket_idle_ttl: Duration::from_secs(5),
3304                },
3305                ..TransportConfig::default()
3306            };
3307
3308            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3309                .await
3310                .expect("bind should succeed");
3311            config.bind_addr = socket.local_addr().expect("local addr should be available");
3312            let server_addr = config.bind_addr;
3313            let mut server =
3314                TransportServer::with_socket(config, socket).expect("server should build");
3315
3316            let client = tokio::net::UdpSocket::bind("127.0.0.1:0")
3317                .await
3318                .expect("client bind should succeed");
3319            let client_addr = client.local_addr().expect("client local addr");
3320
3321            let mut session = Session::new(1492);
3322            assert!(TransportServer::apply_session_transitions(
3323                &mut session,
3324                &[
3325                    SessionState::Req1Recv,
3326                    SessionState::Reply1Sent,
3327                    SessionState::Req2Recv,
3328                    SessionState::Reply2Sent,
3329                    SessionState::ConnReqRecv,
3330                    SessionState::ConnReqAcceptedSent,
3331                    SessionState::NewIncomingRecv,
3332                    SessionState::Connected,
3333                ],
3334            ));
3335            server.sessions.insert(client_addr, session);
3336
3337            let make_datagram = |seq: u32| Datagram {
3338                header: DatagramHeader {
3339                    flags: DatagramFlags::VALID,
3340                    sequence: Sequence24::new(seq),
3341                },
3342                payload: DatagramPayload::Frames(vec![Frame {
3343                    header: FrameHeader::new(Reliability::Unreliable, false, false),
3344                    bit_length: (8u16) << 3,
3345                    reliable_index: None,
3346                    sequence_index: None,
3347                    ordering_index: None,
3348                    ordering_channel: None,
3349                    split: None,
3350                    payload: Bytes::from_static(b"payload!"),
3351                }]),
3352            };
3353
3354            let mut first = BytesMut::new();
3355            make_datagram(1)
3356                .encode(&mut first)
3357                .expect("datagram encode should succeed");
3358            client
3359                .send_to(&first, server_addr)
3360                .await
3361                .expect("send first datagram should succeed");
3362            let first_event =
3363                tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3364                    .await
3365                    .expect("first recv should complete")
3366                    .expect("first recv should succeed");
3367            assert!(
3368                matches!(first_event, TransportEvent::ConnectedFrames { .. }),
3369                "first datagram should pass before budget is depleted"
3370            );
3371
3372            let mut second = BytesMut::new();
3373            make_datagram(2)
3374                .encode(&mut second)
3375                .expect("datagram encode should succeed");
3376            client
3377                .send_to(&second, server_addr)
3378                .await
3379                .expect("send second datagram should succeed");
3380            let second_event =
3381                tokio::time::timeout(Duration::from_secs(1), server.recv_and_process())
3382                    .await
3383                    .expect("second recv should complete")
3384                    .expect("second recv should succeed");
3385            assert!(
3386                matches!(second_event, TransportEvent::RateLimited { addr } if addr == client_addr),
3387                "second datagram should be dropped by processing budget"
3388            );
3389
3390            assert!(
3391                server.sessions.contains_key(&client_addr),
3392                "budget drop must not force disconnect"
3393            );
3394            let metrics = server.metrics_snapshot();
3395            assert_eq!(metrics.processing_budget_drops_total, 1);
3396            assert_eq!(metrics.processing_budget_drops_ip_exhausted_total, 1);
3397        });
3398    }
3399
3400    #[test]
3401    fn open_connection_request2_rejects_already_connected_session() {
3402        let rt = Builder::new_current_thread()
3403            .enable_all()
3404            .build()
3405            .expect("runtime must build");
3406        rt.block_on(async {
3407            let mut config = TransportConfig::default();
3408            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3409                .await
3410                .expect("bind should succeed");
3411            config.bind_addr = socket.local_addr().expect("local addr should be available");
3412            let mut server =
3413                TransportServer::with_socket(config, socket).expect("server should build");
3414            let addr = "127.0.0.1:20310"
3415                .parse::<SocketAddr>()
3416                .expect("valid socket addr");
3417
3418            let mut session = Session::new(1492);
3419            assert!(TransportServer::apply_session_transitions(
3420                &mut session,
3421                &[
3422                    SessionState::Req1Recv,
3423                    SessionState::Reply1Sent,
3424                    SessionState::Req2Recv,
3425                    SessionState::Reply2Sent,
3426                    SessionState::ConnReqRecv,
3427                    SessionState::ConnReqAcceptedSent,
3428                    SessionState::NewIncomingRecv,
3429                    SessionState::Connected,
3430                ]
3431            ));
3432            server.sessions.insert(addr, session);
3433
3434            let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3435                server_addr: server.local_addr().expect("local addr should be available"),
3436                mtu: 1200,
3437                client_guid: 0xAABB_CCDD_EEFF_0011,
3438                cookie: None,
3439                client_proof: false,
3440                parse_path: Request2ParsePath::StrictNoCookie,
3441                magic: DEFAULT_UNCONNECTED_MAGIC,
3442            });
3443
3444            let event = server
3445                .handle_offline_packet(addr, &request2, Instant::now())
3446                .await
3447                .expect("request2 handling should succeed");
3448            assert!(event.is_none(), "reject path must not emit transport event");
3449
3450            let metrics = server.metrics_snapshot();
3451            assert_eq!(metrics.handshake_already_connected_rejects, 1);
3452            assert_eq!(metrics.handshake_missing_req1_drops, 0);
3453        });
3454    }
3455
3456    #[test]
3457    fn open_connection_request1_rejects_ip_recently_connected() {
3458        let rt = Builder::new_current_thread()
3459            .enable_all()
3460            .build()
3461            .expect("runtime must build");
3462        rt.block_on(async {
3463            let mut config = TransportConfig {
3464                ip_recently_connected_window: Duration::from_secs(3),
3465                ..TransportConfig::default()
3466            };
3467            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3468                .await
3469                .expect("bind should succeed");
3470            config.bind_addr = socket.local_addr().expect("local addr should be available");
3471            let mut server =
3472                TransportServer::with_socket(config, socket).expect("server should build");
3473            let addr = "127.0.0.1:20311"
3474                .parse::<SocketAddr>()
3475                .expect("valid socket addr");
3476
3477            let mut session = Session::new(1492);
3478            assert!(TransportServer::apply_session_transitions(
3479                &mut session,
3480                &[
3481                    SessionState::Req1Recv,
3482                    SessionState::Reply1Sent,
3483                    SessionState::Req2Recv,
3484                    SessionState::Reply2Sent,
3485                    SessionState::ConnReqRecv,
3486                    SessionState::ConnReqAcceptedSent,
3487                    SessionState::NewIncomingRecv,
3488                    SessionState::Connected,
3489                ]
3490            ));
3491            server.sessions.insert(addr, session);
3492            server.close_session(addr);
3493
3494            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3495                protocol_version: RAKNET_PROTOCOL_VERSION,
3496                mtu: 1200,
3497                magic: DEFAULT_UNCONNECTED_MAGIC,
3498            });
3499
3500            let event = server
3501                .handle_offline_packet(addr, &request1, Instant::now())
3502                .await
3503                .expect("request1 handling should succeed");
3504            assert!(event.is_none(), "reject path must not emit transport event");
3505            assert!(
3506                !server.pending_handshakes.contains_key(&addr),
3507                "rejected address must not create pending handshake"
3508            );
3509            assert!(
3510                !server.sessions.contains_key(&addr),
3511                "rejected address must not create session"
3512            );
3513
3514            let metrics = server.metrics_snapshot();
3515            assert_eq!(metrics.handshake_ip_recently_connected_rejects, 1);
3516        });
3517    }
3518
3519    #[test]
3520    fn open_connection_request2_drops_when_server_addr_policy_rejects_port() {
3521        let rt = Builder::new_current_thread()
3522            .enable_all()
3523            .build()
3524            .expect("runtime must build");
3525        rt.block_on(async {
3526            let mut config = TransportConfig {
3527                request2_server_addr_policy: Request2ServerAddrPolicy::PortOnly,
3528                ..TransportConfig::default()
3529            };
3530            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3531                .await
3532                .expect("bind should succeed");
3533            config.bind_addr = socket.local_addr().expect("local addr should be available");
3534            let mut server =
3535                TransportServer::with_socket(config, socket).expect("server should build");
3536            let addr = "127.0.0.1:20312"
3537                .parse::<SocketAddr>()
3538                .expect("valid socket addr");
3539
3540            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3541                protocol_version: RAKNET_PROTOCOL_VERSION,
3542                mtu: 1200,
3543                magic: DEFAULT_UNCONNECTED_MAGIC,
3544            });
3545            let event = server
3546                .handle_offline_packet(addr, &request1, Instant::now())
3547                .await
3548                .expect("request1 handling should succeed");
3549            assert!(event.is_none());
3550            assert!(
3551                server.pending_handshakes.contains_key(&addr),
3552                "request1 should create pending handshake"
3553            );
3554
3555            let local_addr = server.local_addr().expect("local addr should be available");
3556            let wrong_port = local_addr.port().saturating_add(1);
3557            let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3558                server_addr: SocketAddr::from(([203, 0, 113, 10], wrong_port)),
3559                mtu: 1200,
3560                client_guid: 0x0123_4567_89AB_CDEF,
3561                cookie: None,
3562                client_proof: false,
3563                parse_path: Request2ParsePath::StrictNoCookie,
3564                magic: DEFAULT_UNCONNECTED_MAGIC,
3565            });
3566
3567            let event = server
3568                .handle_offline_packet(addr, &request2, Instant::now())
3569                .await
3570                .expect("request2 handling should succeed");
3571            assert!(event.is_none(), "policy mismatch should be silent drop");
3572            assert!(
3573                server.pending_handshakes.contains_key(&addr),
3574                "policy mismatch must keep pending handshake for retry"
3575            );
3576
3577            let metrics = server.metrics_snapshot();
3578            assert_eq!(metrics.request2_server_addr_mismatch_drops, 1);
3579            assert_eq!(metrics.handshake_stage_cancel_drops, 1);
3580            assert_eq!(metrics.handshake_missing_req1_drops, 0);
3581        });
3582    }
3583
3584    #[test]
3585    fn open_connection_request2_accepts_mismatch_when_policy_disabled() {
3586        let rt = Builder::new_current_thread()
3587            .enable_all()
3588            .build()
3589            .expect("runtime must build");
3590        rt.block_on(async {
3591            let mut config = TransportConfig {
3592                request2_server_addr_policy: Request2ServerAddrPolicy::Disabled,
3593                ..TransportConfig::default()
3594            };
3595            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3596                .await
3597                .expect("bind should succeed");
3598            config.bind_addr = socket.local_addr().expect("local addr should be available");
3599            let mut server =
3600                TransportServer::with_socket(config, socket).expect("server should build");
3601            let addr = "127.0.0.1:20313"
3602                .parse::<SocketAddr>()
3603                .expect("valid socket addr");
3604
3605            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3606                protocol_version: RAKNET_PROTOCOL_VERSION,
3607                mtu: 1200,
3608                magic: DEFAULT_UNCONNECTED_MAGIC,
3609            });
3610            server
3611                .handle_offline_packet(addr, &request1, Instant::now())
3612                .await
3613                .expect("request1 handling should succeed");
3614
3615            let local_addr = server.local_addr().expect("local addr should be available");
3616            let wrong_port = local_addr.port().saturating_add(1);
3617            let cookie = server.generate_cookie(addr);
3618            let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3619                server_addr: SocketAddr::from(([203, 0, 113, 10], wrong_port)),
3620                mtu: 1200,
3621                client_guid: 0x1020_3040_5060_7080,
3622                cookie: Some(cookie),
3623                client_proof: true,
3624                parse_path: Request2ParsePath::StrictWithCookie,
3625                magic: DEFAULT_UNCONNECTED_MAGIC,
3626            });
3627
3628            let event = server
3629                .handle_offline_packet(addr, &request2, Instant::now())
3630                .await
3631                .expect("request2 handling should succeed");
3632            assert!(event.is_none(), "successful request2 should not emit event");
3633
3634            assert!(
3635                server.pending_handshakes.contains_key(&addr),
3636                "accepted request2 must keep pending handshake until connected"
3637            );
3638            let session = server
3639                .sessions
3640                .get(&addr)
3641                .expect("accepted request2 must keep session");
3642            assert_eq!(session.state(), SessionState::Reply2Sent);
3643
3644            let metrics = server.metrics_snapshot();
3645            assert_eq!(metrics.request2_server_addr_mismatch_drops, 0);
3646        });
3647    }
3648
3649    #[test]
3650    fn open_connection_request2_retry_resends_reply2_without_missing_req1_penalty() {
3651        let rt = Builder::new_current_thread()
3652            .enable_all()
3653            .build()
3654            .expect("runtime must build");
3655        rt.block_on(async {
3656            let mut config = TransportConfig::default();
3657            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0")
3658                .await
3659                .expect("bind should succeed");
3660            config.bind_addr = socket.local_addr().expect("local addr should be available");
3661            let mut server =
3662                TransportServer::with_socket(config, socket).expect("server should build");
3663            let addr = "127.0.0.1:20314"
3664                .parse::<SocketAddr>()
3665                .expect("valid socket addr");
3666
3667            let request1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
3668                protocol_version: RAKNET_PROTOCOL_VERSION,
3669                mtu: 1200,
3670                magic: DEFAULT_UNCONNECTED_MAGIC,
3671            });
3672            server
3673                .handle_offline_packet(addr, &request1, Instant::now())
3674                .await
3675                .expect("request1 handling should succeed");
3676
3677            let cookie = server.generate_cookie(addr);
3678            let request2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
3679                server_addr: server.local_addr().expect("local addr should be available"),
3680                mtu: 1200,
3681                client_guid: 0x1111_2222_3333_4444,
3682                cookie: Some(cookie),
3683                client_proof: true,
3684                parse_path: Request2ParsePath::StrictWithCookie,
3685                magic: DEFAULT_UNCONNECTED_MAGIC,
3686            });
3687
3688            let first = server
3689                .handle_offline_packet(addr, &request2, Instant::now())
3690                .await
3691                .expect("first request2 should succeed");
3692            assert!(first.is_none());
3693
3694            let second = server
3695                .handle_offline_packet(addr, &request2, Instant::now() + Duration::from_millis(10))
3696                .await
3697                .expect("retry request2 should succeed");
3698            assert!(second.is_none());
3699
3700            let pending = server
3701                .pending_handshakes
3702                .get(&addr)
3703                .copied()
3704                .expect("pending handshake should remain until connected");
3705            assert_eq!(
3706                pending.stage,
3707                PendingHandshakeStage::AwaitingConnectionRequest
3708            );
3709
3710            let session = server
3711                .sessions
3712                .get(&addr)
3713                .expect("session should exist during post-reply2 handshake stage");
3714            assert_eq!(session.state(), SessionState::Reply2Sent);
3715
3716            let metrics = server.metrics_snapshot();
3717            assert_eq!(metrics.handshake_missing_req1_drops, 0);
3718            assert_eq!(metrics.handshake_stage_cancel_drops, 0);
3719        });
3720    }
3721
3722    #[test]
3723    fn bind_shards_requires_reuse_port_when_count_is_greater_than_one() {
3724        let config = TransportConfig {
3725            reuse_port: false,
3726            ..TransportConfig::default()
3727        };
3728        let rt = Builder::new_current_thread()
3729            .enable_all()
3730            .build()
3731            .expect("runtime must build");
3732        match rt.block_on(TransportServer::bind_shards(config, 2)) {
3733            Ok(_) => panic!("should reject shard_count > 1 without reuse_port"),
3734            Err(err) => assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput),
3735        }
3736    }
3737
3738    #[test]
3739    fn bind_shard_plan_enables_both_families_for_split_mode() {
3740        let config = TransportConfig {
3741            bind_addr: SocketAddr::from(([0, 0, 0, 0], 19132)),
3742            split_ipv4_ipv6_bind: true,
3743            ..TransportConfig::default()
3744        };
3745        let plan = TransportServer::build_shard_bind_plan(&config, 1);
3746        let expected_v4 = SocketAddr::from(([0, 0, 0, 0], 19132));
3747        let expected_v6 = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 19132, 0, 0));
3748        assert_eq!(plan.len(), 2);
3749        assert!(plan.contains(&expected_v4));
3750        assert!(plan.contains(&expected_v6));
3751    }
3752
3753    #[test]
3754    fn bind_shard_plan_round_robins_families_when_split_mode_exceeds_two_workers() {
3755        let config = TransportConfig {
3756            bind_addr: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 19132)),
3757            split_ipv4_ipv6_bind: true,
3758            ..TransportConfig::default()
3759        };
3760        let plan = TransportServer::build_shard_bind_plan(&config, 3);
3761        let expected_v4 = SocketAddr::from(([0, 0, 0, 0], 19132));
3762        let expected_v6 = SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 19132, 0, 0));
3763        assert_eq!(plan, vec![expected_v4, expected_v6, expected_v4]);
3764        assert!(TransportServer::has_duplicate_bind_targets(&plan));
3765    }
3766
3767    #[test]
3768    fn bind_shards_split_mode_rejects_duplicate_workers_without_reuse_port() {
3769        let config = TransportConfig {
3770            reuse_port: false,
3771            split_ipv4_ipv6_bind: true,
3772            ..TransportConfig::default()
3773        };
3774        let rt = Builder::new_current_thread()
3775            .enable_all()
3776            .build()
3777            .expect("runtime must build");
3778        match rt.block_on(TransportServer::bind_shards(config, 3)) {
3779            Ok(_) => panic!("should reject duplicate split bind targets without reuse_port"),
3780            Err(err) => assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput),
3781        }
3782    }
3783
3784    #[test]
3785    fn bind_rejects_split_mode_in_single_socket_path() {
3786        let config = TransportConfig {
3787            split_ipv4_ipv6_bind: true,
3788            ..TransportConfig::default()
3789        };
3790        let rt = Builder::new_current_thread()
3791            .enable_all()
3792            .build()
3793            .expect("runtime must build");
3794        match rt.block_on(TransportServer::bind(config)) {
3795            Ok(_) => panic!("single-socket bind must reject split mode"),
3796            Err(err) => assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput),
3797        }
3798    }
3799
3800    #[test]
3801    #[cfg(any(
3802        target_os = "linux",
3803        target_os = "android",
3804        target_os = "macos",
3805        target_os = "ios",
3806        target_os = "freebsd",
3807        target_os = "netbsd",
3808        target_os = "openbsd"
3809    ))]
3810    fn bind_accepts_socket_tuning_profile() {
3811        let config = TransportConfig {
3812            socket_tuning: TransportSocketTuning {
3813                recv_buffer_size: Some(512 * 1024),
3814                send_buffer_size: Some(512 * 1024),
3815                ipv4_ttl: Some(64),
3816                ipv4_tos: Some(0x10),
3817                ipv6_unicast_hops: None,
3818                disable_ip_fragmentation: false,
3819            },
3820            ..TransportConfig::default()
3821        };
3822        let rt = Builder::new_current_thread()
3823            .enable_all()
3824            .build()
3825            .expect("runtime must build");
3826        let server = rt
3827            .block_on(TransportServer::bind(config))
3828            .expect("bind with valid socket tuning should succeed");
3829        let _addr = server.local_addr().expect("server must have local addr");
3830    }
3831
3832    #[test]
3833    #[cfg(any(target_os = "linux", target_os = "android"))]
3834    fn bind_accepts_disable_ip_fragmentation_on_linux_like_targets() {
3835        let config = TransportConfig {
3836            socket_tuning: TransportSocketTuning {
3837                disable_ip_fragmentation: true,
3838                ..TransportSocketTuning::default()
3839            },
3840            ..TransportConfig::default()
3841        };
3842        let rt = Builder::new_current_thread()
3843            .enable_all()
3844            .build()
3845            .expect("runtime must build");
3846        let server = rt
3847            .block_on(TransportServer::bind(config))
3848            .expect("disable_ip_fragmentation should be supported");
3849        let _addr = server.local_addr().expect("server must have local addr");
3850    }
3851
3852    #[test]
3853    #[cfg(any(
3854        target_os = "linux",
3855        target_os = "android",
3856        target_os = "macos",
3857        target_os = "ios",
3858        target_os = "freebsd",
3859        target_os = "netbsd",
3860        target_os = "openbsd"
3861    ))]
3862    fn platform_reports_sharded_reuse_port_support() {
3863        assert!(TransportServer::supports_reuse_port_sharded_bind());
3864    }
3865
3866    #[test]
3867    #[cfg(not(any(
3868        target_os = "linux",
3869        target_os = "android",
3870        target_os = "macos",
3871        target_os = "ios",
3872        target_os = "freebsd",
3873        target_os = "netbsd",
3874        target_os = "openbsd"
3875    )))]
3876    fn bind_shards_uses_shared_socket_fallback_on_unsupported_platform() {
3877        assert!(!TransportServer::supports_reuse_port_sharded_bind());
3878        let config = TransportConfig {
3879            reuse_port: true,
3880            ..TransportConfig::default()
3881        };
3882        let rt = Builder::new_current_thread()
3883            .enable_all()
3884            .build()
3885            .expect("runtime must build");
3886        let workers = rt
3887            .block_on(TransportServer::bind_shards(config, 2))
3888            .expect("fallback bind should succeed");
3889        assert_eq!(workers.len(), 2);
3890        let first_addr = workers[0].local_addr().expect("worker local addr");
3891        for worker in workers.iter().skip(1) {
3892            let addr = worker.local_addr().expect("worker local addr");
3893            assert_eq!(addr, first_addr);
3894        }
3895    }
3896}