Skip to main content

raknet_rust/
client.rs

1//! High-level client API.
2//!
3//! [`RaknetClient`] manages handshake, reliability, keepalive, and event polling
4//! for a single outbound connection to a RakNet server.
5
6use std::collections::VecDeque;
7use std::io;
8use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use bytes::{Bytes, BytesMut};
12use thiserror::Error;
13use tokio::net::UdpSocket;
14use tokio::time::{self, sleep};
15use tracing::{debug, info, warn};
16
17use crate::error::ConfigValidationError;
18use crate::handshake::{
19    OfflinePacket, OpenConnectionReply1, OpenConnectionReply2, OpenConnectionRequest1,
20    OpenConnectionRequest2, Request2ParsePath,
21};
22use crate::protocol::connected::{
23    ConnectedControlPacket, ConnectedPing, ConnectedPong, ConnectionRequest,
24    ConnectionRequestAccepted, DetectLostConnection, DisconnectionNotification,
25    NewIncomingConnection, SYSTEM_ADDRESS_COUNT,
26};
27use crate::protocol::constants::{
28    DEFAULT_UNCONNECTED_MAGIC, MAXIMUM_MTU_SIZE, MINIMUM_MTU_SIZE, MTU_PROBE_ORDER,
29    RAKNET_PROTOCOL_VERSION,
30};
31use crate::protocol::datagram::Datagram;
32use crate::protocol::reliability::Reliability;
33use crate::protocol::sequence24::Sequence24;
34use crate::session::{
35    QueuePayloadResult, RakPriority, Session, SessionMetricsSnapshot, SessionState,
36};
37
38pub type ClientResult<T> = Result<T, RaknetClientError>;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41/// Send policy used by [`RaknetClient::send_with_options`].
42pub struct ClientSendOptions {
43    /// RakNet reliability class for outgoing payload.
44    pub reliability: Reliability,
45    /// Ordering channel for ordered/sequenced reliabilities.
46    pub channel: u8,
47    /// Session scheduling priority.
48    pub priority: RakPriority,
49}
50
51impl Default for ClientSendOptions {
52    fn default() -> Self {
53        Self {
54            reliability: Reliability::ReliableOrdered,
55            channel: 0,
56            priority: RakPriority::High,
57        }
58    }
59}
60
61#[derive(Debug, Clone)]
62/// Configuration for [`RaknetClient`].
63pub struct RaknetClientConfig {
64    pub local_addr: Option<SocketAddr>,
65    pub guid: u64,
66    pub protocol_version: u8,
67    pub mtu: u16,
68    pub mtu_probe_order: Vec<u16>,
69    pub mtu_probe_attempts_per_step: usize,
70    pub mtu_probe_wait_per_attempt: Duration,
71    pub handshake_timeout: Duration,
72    pub outbound_tick_interval: Duration,
73    pub session_keepalive_interval: Duration,
74    pub session_idle_timeout: Duration,
75    pub recv_buffer_capacity: usize,
76    pub max_new_datagrams_per_tick: usize,
77    pub max_new_bytes_per_tick: usize,
78    pub max_resend_datagrams_per_tick: usize,
79    pub max_resend_bytes_per_tick: usize,
80    pub max_new_datagrams_per_recv: usize,
81    pub max_new_bytes_per_recv: usize,
82    pub max_resend_datagrams_per_recv: usize,
83    pub max_resend_bytes_per_recv: usize,
84}
85
86impl Default for RaknetClientConfig {
87    fn default() -> Self {
88        Self {
89            local_addr: None,
90            guid: random_guid(),
91            protocol_version: RAKNET_PROTOCOL_VERSION,
92            mtu: 1200,
93            mtu_probe_order: MTU_PROBE_ORDER.to_vec(),
94            mtu_probe_attempts_per_step: 2,
95            mtu_probe_wait_per_attempt: Duration::from_millis(350),
96            handshake_timeout: Duration::from_secs(5),
97            outbound_tick_interval: Duration::from_millis(10),
98            session_keepalive_interval: Duration::from_secs(10),
99            session_idle_timeout: Duration::from_secs(30),
100            recv_buffer_capacity: MAXIMUM_MTU_SIZE as usize,
101            max_new_datagrams_per_tick: 8,
102            max_new_bytes_per_tick: 64 * 1024,
103            max_resend_datagrams_per_tick: 8,
104            max_resend_bytes_per_tick: 64 * 1024,
105            max_new_datagrams_per_recv: 6,
106            max_new_bytes_per_recv: 64 * 1024,
107            max_resend_datagrams_per_recv: 6,
108            max_resend_bytes_per_recv: 64 * 1024,
109        }
110    }
111}
112
113impl RaknetClientConfig {
114    /// Validates configuration invariants.
115    pub fn validate(&self) -> Result<(), ConfigValidationError> {
116        if !(MINIMUM_MTU_SIZE..=MAXIMUM_MTU_SIZE).contains(&self.mtu) {
117            return Err(ConfigValidationError::new(
118                "RaknetClientConfig",
119                "mtu",
120                format!(
121                    "must be within [{MINIMUM_MTU_SIZE}, {MAXIMUM_MTU_SIZE}], got {}",
122                    self.mtu
123                ),
124            ));
125        }
126        if self.mtu_probe_attempts_per_step == 0 {
127            return Err(ConfigValidationError::new(
128                "RaknetClientConfig",
129                "mtu_probe_attempts_per_step",
130                "must be >= 1",
131            ));
132        }
133        if self.mtu_probe_wait_per_attempt.is_zero() {
134            return Err(ConfigValidationError::new(
135                "RaknetClientConfig",
136                "mtu_probe_wait_per_attempt",
137                "must be > 0",
138            ));
139        }
140        if self.handshake_timeout.is_zero() {
141            return Err(ConfigValidationError::new(
142                "RaknetClientConfig",
143                "handshake_timeout",
144                "must be > 0",
145            ));
146        }
147        if self.outbound_tick_interval.is_zero() {
148            return Err(ConfigValidationError::new(
149                "RaknetClientConfig",
150                "outbound_tick_interval",
151                "must be > 0",
152            ));
153        }
154        if self.session_keepalive_interval.is_zero() {
155            return Err(ConfigValidationError::new(
156                "RaknetClientConfig",
157                "session_keepalive_interval",
158                "must be > 0",
159            ));
160        }
161        if self.session_idle_timeout.is_zero() {
162            return Err(ConfigValidationError::new(
163                "RaknetClientConfig",
164                "session_idle_timeout",
165                "must be > 0",
166            ));
167        }
168        if self.recv_buffer_capacity < self.mtu as usize {
169            return Err(ConfigValidationError::new(
170                "RaknetClientConfig",
171                "recv_buffer_capacity",
172                format!(
173                    "must be >= mtu ({}), got {}",
174                    self.mtu, self.recv_buffer_capacity
175                ),
176            ));
177        }
178        if self.max_new_datagrams_per_tick == 0 {
179            return Err(ConfigValidationError::new(
180                "RaknetClientConfig",
181                "max_new_datagrams_per_tick",
182                "must be >= 1",
183            ));
184        }
185        if self.max_new_bytes_per_tick < self.mtu as usize {
186            return Err(ConfigValidationError::new(
187                "RaknetClientConfig",
188                "max_new_bytes_per_tick",
189                format!(
190                    "must be >= mtu ({}), got {}",
191                    self.mtu, self.max_new_bytes_per_tick
192                ),
193            ));
194        }
195        if self.max_resend_datagrams_per_tick == 0 {
196            return Err(ConfigValidationError::new(
197                "RaknetClientConfig",
198                "max_resend_datagrams_per_tick",
199                "must be >= 1",
200            ));
201        }
202        if self.max_resend_bytes_per_tick < self.mtu as usize {
203            return Err(ConfigValidationError::new(
204                "RaknetClientConfig",
205                "max_resend_bytes_per_tick",
206                format!(
207                    "must be >= mtu ({}), got {}",
208                    self.mtu, self.max_resend_bytes_per_tick
209                ),
210            ));
211        }
212        if self.max_new_datagrams_per_recv == 0 {
213            return Err(ConfigValidationError::new(
214                "RaknetClientConfig",
215                "max_new_datagrams_per_recv",
216                "must be >= 1",
217            ));
218        }
219        if self.max_new_bytes_per_recv < self.mtu as usize {
220            return Err(ConfigValidationError::new(
221                "RaknetClientConfig",
222                "max_new_bytes_per_recv",
223                format!(
224                    "must be >= mtu ({}), got {}",
225                    self.mtu, self.max_new_bytes_per_recv
226                ),
227            ));
228        }
229        if self.max_resend_datagrams_per_recv == 0 {
230            return Err(ConfigValidationError::new(
231                "RaknetClientConfig",
232                "max_resend_datagrams_per_recv",
233                "must be >= 1",
234            ));
235        }
236        if self.max_resend_bytes_per_recv < self.mtu as usize {
237            return Err(ConfigValidationError::new(
238                "RaknetClientConfig",
239                "max_resend_bytes_per_recv",
240                format!(
241                    "must be >= mtu ({}), got {}",
242                    self.mtu, self.max_resend_bytes_per_recv
243                ),
244            ));
245        }
246
247        Ok(())
248    }
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq)]
252/// Last handshake stage reached before timeout.
253pub enum HandshakeStage {
254    OpenConnectionRequest1,
255    OpenConnectionRequest2,
256    ConnectionRequestAccepted,
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260/// Explicit offline handshake rejection reasons.
261pub enum OfflineRejectionReason {
262    IncompatibleProtocolVersion {
263        protocol_version: u8,
264        server_guid: u64,
265    },
266    ConnectionRequestFailed {
267        server_guid: u64,
268    },
269    AlreadyConnected {
270        server_guid: u64,
271    },
272    NoFreeIncomingConnections {
273        server_guid: u64,
274    },
275    ConnectionBanned {
276        server_guid: u64,
277    },
278    IpRecentlyConnected {
279        server_guid: u64,
280    },
281}
282
283#[derive(Debug, Error, Clone, PartialEq, Eq)]
284/// Errors returned by client connect/send/receive operations.
285pub enum RaknetClientError {
286    #[error("transport io error: {message}")]
287    Io { message: String },
288    #[error("invalid client config: {details}")]
289    InvalidConfig { details: String },
290    #[error("handshake timed out at stage {stage:?}")]
291    HandshakeTimeout { stage: HandshakeStage },
292    #[error("handshake rejected by server: {reason:?}")]
293    OfflineRejected { reason: OfflineRejectionReason },
294    #[error("handshake protocol violation: {details}")]
295    HandshakeProtocolViolation { details: String },
296    #[error("client closed: {reason:?}")]
297    Closed { reason: ClientDisconnectReason },
298    #[error("payload dropped by backpressure")]
299    BackpressureDropped,
300    #[error("payload deferred by backpressure")]
301    BackpressureDeferred,
302    #[error("backpressure requested disconnect")]
303    BackpressureDisconnect,
304}
305
306impl From<io::Error> for RaknetClientError {
307    fn from(value: io::Error) -> Self {
308        Self::Io {
309            message: value.to_string(),
310        }
311    }
312}
313
314impl From<RaknetClientError> for io::Error {
315    fn from(value: RaknetClientError) -> Self {
316        io::Error::other(value.to_string())
317    }
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
321/// Reason for client-side disconnection.
322pub enum ClientDisconnectReason {
323    Requested,
324    Backpressure,
325    IdleTimeout,
326    RemoteDisconnectionNotification { reason_code: Option<u8> },
327    RemoteDetectLostConnection,
328    TransportError { message: String },
329}
330
331#[derive(Debug)]
332/// Event stream produced by [`RaknetClient::next_event`].
333pub enum RaknetClientEvent {
334    Connected {
335        server_addr: SocketAddr,
336        mtu: u16,
337    },
338    Packet {
339        payload: Bytes,
340        reliability: Reliability,
341        reliable_index: Option<Sequence24>,
342        sequence_index: Option<Sequence24>,
343        ordering_index: Option<Sequence24>,
344        ordering_channel: Option<u8>,
345    },
346    ReceiptAcked {
347        receipt_id: u64,
348    },
349    DecodeError {
350        error: String,
351    },
352    Disconnected {
353        reason: ClientDisconnectReason,
354    },
355}
356
357#[derive(Debug, Clone)]
358/// Retry policy for [`RaknetClient::connect_with_retry`].
359pub struct ReconnectPolicy {
360    pub max_attempts: usize,
361    pub initial_backoff: Duration,
362    pub max_backoff: Duration,
363    pub retry_on_io: bool,
364    pub retry_on_handshake_timeout: bool,
365    pub fast_fail_on_offline_rejection: bool,
366}
367
368impl Default for ReconnectPolicy {
369    fn default() -> Self {
370        Self {
371            max_attempts: 3,
372            initial_backoff: Duration::from_millis(200),
373            max_backoff: Duration::from_secs(3),
374            retry_on_io: true,
375            retry_on_handshake_timeout: true,
376            fast_fail_on_offline_rejection: true,
377        }
378    }
379}
380
381impl ReconnectPolicy {
382    /// Validates retry policy invariants.
383    pub fn validate(&self) -> Result<(), ConfigValidationError> {
384        if self.max_attempts == 0 {
385            return Err(ConfigValidationError::new(
386                "ReconnectPolicy",
387                "max_attempts",
388                "must be >= 1",
389            ));
390        }
391        if self.initial_backoff > self.max_backoff {
392            return Err(ConfigValidationError::new(
393                "ReconnectPolicy",
394                "initial_backoff",
395                format!(
396                    "must be <= max_backoff ({}ms), got {}ms",
397                    self.max_backoff.as_millis(),
398                    self.initial_backoff.as_millis()
399                ),
400            ));
401        }
402        Ok(())
403    }
404}
405
406/// Single-session high-level RakNet client.
407pub struct RaknetClient {
408    socket: UdpSocket,
409    server_addr: SocketAddr,
410    session: Session,
411    config: RaknetClientConfig,
412    recv_buffer: Vec<u8>,
413    pending_events: VecDeque<RaknetClientEvent>,
414    closed: bool,
415    close_reason: Option<ClientDisconnectReason>,
416    last_inbound_activity: Instant,
417}
418
419impl RaknetClient {
420    /// Connects with default [`RaknetClientConfig`].
421    pub async fn connect(server_addr: SocketAddr) -> ClientResult<Self> {
422        Self::connect_with_config(server_addr, RaknetClientConfig::default()).await
423    }
424
425    /// Connects with retry policy.
426    pub async fn connect_with_retry(
427        server_addr: SocketAddr,
428        config: RaknetClientConfig,
429        policy: ReconnectPolicy,
430    ) -> ClientResult<Self> {
431        policy
432            .validate()
433            .map_err(|error| RaknetClientError::InvalidConfig {
434                details: error.to_string(),
435            })?;
436
437        let mut attempt = 0usize;
438        let max_attempts = policy.max_attempts.max(1);
439        let mut backoff = policy.initial_backoff;
440        let mut last_error: Option<RaknetClientError> = None;
441
442        while attempt < max_attempts {
443            attempt = attempt.saturating_add(1);
444            match Self::connect_with_config(server_addr, config.clone()).await {
445                Ok(client) => return Ok(client),
446                Err(error) => {
447                    let should_retry =
448                        attempt < max_attempts && should_retry_connect(&error, &policy);
449                    last_error = Some(error);
450                    if !should_retry {
451                        break;
452                    }
453
454                    if !backoff.is_zero() {
455                        sleep(backoff).await;
456                    }
457                    backoff = next_backoff(backoff, &policy);
458                }
459            }
460        }
461
462        Err(
463            last_error.unwrap_or(RaknetClientError::HandshakeProtocolViolation {
464                details: "connect_with_retry terminated without a recorded error".to_string(),
465            }),
466        )
467    }
468
469    /// Connects using explicit client configuration.
470    pub async fn connect_with_config(
471        server_addr: SocketAddr,
472        config: RaknetClientConfig,
473    ) -> ClientResult<Self> {
474        config
475            .validate()
476            .map_err(|error| RaknetClientError::InvalidConfig {
477                details: error.to_string(),
478            })?;
479
480        let local_addr = config
481            .local_addr
482            .unwrap_or_else(|| default_bind_addr_for_server(server_addr));
483        info!(%server_addr, %local_addr, "client connecting");
484        let socket = UdpSocket::bind(local_addr)
485            .await
486            .map_err(RaknetClientError::from)?;
487
488        let now = Instant::now();
489        let mut client = Self {
490            socket,
491            server_addr,
492            session: Session::new(config.mtu as usize),
493            recv_buffer: vec![0u8; config.recv_buffer_capacity],
494            config,
495            pending_events: VecDeque::new(),
496            closed: false,
497            close_reason: None,
498            last_inbound_activity: now,
499        };
500
501        client.perform_handshake().await?;
502        info!(
503            %server_addr,
504            mtu = client.session.mtu(),
505            "client handshake established"
506        );
507        client
508            .pending_events
509            .push_back(RaknetClientEvent::Connected {
510                server_addr,
511                mtu: client.session.mtu() as u16,
512            });
513
514        Ok(client)
515    }
516
517    /// Returns local socket address used by this client.
518    pub fn local_addr(&self) -> io::Result<SocketAddr> {
519        self.socket.local_addr()
520    }
521
522    /// Returns remote server address.
523    pub fn server_addr(&self) -> SocketAddr {
524        self.server_addr
525    }
526
527    /// Returns session-level metrics snapshot.
528    pub fn metrics_snapshot(&self) -> SessionMetricsSnapshot {
529        self.session.metrics_snapshot()
530    }
531
532    /// Sends payload with default send options.
533    pub async fn send(&mut self, payload: impl Into<Bytes>) -> ClientResult<()> {
534        self.send_with_options(payload, ClientSendOptions::default())
535            .await
536    }
537
538    /// Sends payload with explicit options.
539    pub async fn send_with_options(
540        &mut self,
541        payload: impl Into<Bytes>,
542        options: ClientSendOptions,
543    ) -> ClientResult<()> {
544        self.ensure_open()?;
545        self.queue_payload_with_optional_receipt(
546            payload.into(),
547            options.reliability,
548            options.channel,
549            options.priority,
550            None,
551        )?;
552        self.flush_outbound_with_limits(
553            self.config.max_new_datagrams_per_recv,
554            self.config.max_new_bytes_per_recv,
555            self.config.max_resend_datagrams_per_recv,
556            self.config.max_resend_bytes_per_recv,
557        )
558        .await
559    }
560
561    /// Sends payload and tracks a receipt id.
562    pub async fn send_with_receipt(
563        &mut self,
564        payload: impl Into<Bytes>,
565        receipt_id: u64,
566        options: ClientSendOptions,
567    ) -> ClientResult<()> {
568        self.ensure_open()?;
569        self.queue_payload_with_optional_receipt(
570            payload.into(),
571            options.reliability,
572            options.channel,
573            options.priority,
574            Some(receipt_id),
575        )?;
576        self.flush_outbound_with_limits(
577            self.config.max_new_datagrams_per_recv,
578            self.config.max_new_bytes_per_recv,
579            self.config.max_resend_datagrams_per_recv,
580            self.config.max_resend_bytes_per_recv,
581        )
582        .await
583    }
584
585    /// Gracefully disconnects client.
586    pub async fn disconnect(&mut self, reason_code: Option<u8>) -> ClientResult<()> {
587        if self.closed {
588            return Ok(());
589        }
590
591        let packet = ConnectedControlPacket::DisconnectionNotification(DisconnectionNotification {
592            reason: reason_code,
593        });
594
595        let queued = self.queue_connected_control_packet(
596            packet,
597            Reliability::ReliableOrdered,
598            0,
599            RakPriority::High,
600        );
601
602        if queued.is_ok() {
603            let _ = self
604                .flush_outbound_with_limits(
605                    self.config.max_new_datagrams_per_tick,
606                    self.config.max_new_bytes_per_tick,
607                    self.config.max_resend_datagrams_per_tick,
608                    self.config.max_resend_bytes_per_tick,
609                )
610                .await;
611        }
612
613        self.finish_close(ClientDisconnectReason::Requested);
614        queued
615    }
616
617    /// Polls next client event.
618    ///
619    /// Returns `None` once client is fully closed and pending events are drained.
620    pub async fn next_event(&mut self) -> Option<RaknetClientEvent> {
621        if let Some(event) = self.pending_events.pop_front() {
622            return Some(event);
623        }
624        if self.closed {
625            return None;
626        }
627
628        loop {
629            if let Some(event) = self.pending_events.pop_front() {
630                return Some(event);
631            }
632            if self.closed {
633                return None;
634            }
635            if self.check_idle_timeout_and_close() {
636                continue;
637            }
638
639            match time::timeout(
640                self.config.outbound_tick_interval,
641                self.socket.recv_from(&mut self.recv_buffer),
642            )
643            .await
644            {
645                Ok(Ok((len, addr))) => {
646                    if addr != self.server_addr {
647                        continue;
648                    }
649                    self.last_inbound_activity = Instant::now();
650
651                    if let Err(error) = self.process_inbound_packet(len).await {
652                        self.finish_close(ClientDisconnectReason::TransportError {
653                            message: format!("inbound processing failed: {error}"),
654                        });
655                    }
656                }
657                Ok(Err(error)) => {
658                    self.finish_close(ClientDisconnectReason::TransportError {
659                        message: format!("udp receive failed: {error}"),
660                    });
661                }
662                Err(_) => {
663                    if let Err(error) = self
664                        .flush_outbound_with_limits(
665                            self.config.max_new_datagrams_per_tick,
666                            self.config.max_new_bytes_per_tick,
667                            self.config.max_resend_datagrams_per_tick,
668                            self.config.max_resend_bytes_per_tick,
669                        )
670                        .await
671                    {
672                        self.finish_close(ClientDisconnectReason::TransportError {
673                            message: format!("outbound tick failed: {error}"),
674                        });
675                    }
676                }
677            }
678        }
679    }
680
681    async fn perform_handshake(&mut self) -> ClientResult<()> {
682        debug!(server_addr = %self.server_addr, "starting client handshake");
683        if !self.session.transition_to(SessionState::Req1Recv) {
684            return Err(RaknetClientError::HandshakeProtocolViolation {
685                details: "session transition failed before request1".to_string(),
686            });
687        }
688
689        let deadline = Instant::now() + self.config.handshake_timeout;
690        let reply1 = self.probe_open_connection_reply1(deadline).await?;
691
692        if !self.session.transition_to(SessionState::Reply1Sent) {
693            return Err(RaknetClientError::HandshakeProtocolViolation {
694                details: "session transition failed after reply1".to_string(),
695            });
696        }
697
698        self.session
699            .set_mtu(reply1.mtu.clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE) as usize);
700
701        if !self.session.transition_to(SessionState::Req2Recv) {
702            return Err(RaknetClientError::HandshakeProtocolViolation {
703                details: "session transition failed before request2".to_string(),
704            });
705        }
706
707        let req2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
708            server_addr: self.server_addr,
709            mtu: reply1.mtu,
710            client_guid: self.config.guid,
711            cookie: reply1.cookie,
712            client_proof: false,
713            parse_path: if reply1.cookie.is_some() {
714                Request2ParsePath::StrictWithCookie
715            } else {
716                Request2ParsePath::StrictNoCookie
717            },
718            magic: DEFAULT_UNCONNECTED_MAGIC,
719        });
720        self.send_offline_packet(&req2).await?;
721
722        let _reply2 = self.wait_for_open_connection_reply2(deadline).await?;
723        if !self.session.transition_to(SessionState::Reply2Sent) {
724            return Err(RaknetClientError::HandshakeProtocolViolation {
725                details: "session transition failed after reply2".to_string(),
726            });
727        }
728
729        if !self.session.transition_to(SessionState::ConnReqRecv) {
730            return Err(RaknetClientError::HandshakeProtocolViolation {
731                details: "session transition failed before connection request".to_string(),
732            });
733        }
734
735        let request_time = unix_timestamp_millis();
736        self.queue_connected_control_packet(
737            ConnectedControlPacket::ConnectionRequest(ConnectionRequest {
738                client_guid: self.config.guid,
739                request_time,
740                use_encryption: false,
741            }),
742            Reliability::ReliableOrdered,
743            0,
744            RakPriority::High,
745        )?;
746        self.flush_outbound_with_limits(
747            self.config.max_new_datagrams_per_recv,
748            self.config.max_new_bytes_per_recv,
749            self.config.max_resend_datagrams_per_recv,
750            self.config.max_resend_bytes_per_recv,
751        )
752        .await?;
753
754        let accepted = self.wait_for_connection_request_accepted(deadline).await?;
755
756        if !self
757            .session
758            .transition_to(SessionState::ConnReqAcceptedSent)
759        {
760            return Err(RaknetClientError::HandshakeProtocolViolation {
761                details: "session transition failed after request accepted".to_string(),
762            });
763        }
764
765        if !self.session.transition_to(SessionState::NewIncomingRecv) {
766            return Err(RaknetClientError::HandshakeProtocolViolation {
767                details: "session transition failed before new incoming".to_string(),
768            });
769        }
770
771        self.queue_connected_control_packet(
772            ConnectedControlPacket::NewIncomingConnection(NewIncomingConnection {
773                server_addr: self.server_addr,
774                internal_addrs: build_internal_addrs(self.server_addr),
775                request_time: accepted.request_time,
776                accepted_time: accepted.accepted_time,
777            }),
778            Reliability::ReliableOrdered,
779            0,
780            RakPriority::High,
781        )?;
782
783        self.flush_outbound_with_limits(
784            self.config.max_new_datagrams_per_recv,
785            self.config.max_new_bytes_per_recv,
786            self.config.max_resend_datagrams_per_recv,
787            self.config.max_resend_bytes_per_recv,
788        )
789        .await?;
790
791        if !self.session.transition_to(SessionState::Connected) {
792            return Err(RaknetClientError::HandshakeProtocolViolation {
793                details: "session transition failed to connected".to_string(),
794            });
795        }
796
797        debug!(server_addr = %self.server_addr, "client handshake completed");
798        self.last_inbound_activity = Instant::now();
799        Ok(())
800    }
801
802    async fn probe_open_connection_reply1(
803        &mut self,
804        overall_deadline: Instant,
805    ) -> ClientResult<OpenConnectionReply1> {
806        let candidates = self.mtu_probe_candidates();
807        for mtu in candidates {
808            for _ in 0..self.config.mtu_probe_attempts_per_step {
809                if Instant::now() >= overall_deadline {
810                    warn!(
811                        server_addr = %self.server_addr,
812                        stage = ?HandshakeStage::OpenConnectionRequest1,
813                        "client handshake timed out"
814                    );
815                    return Err(RaknetClientError::HandshakeTimeout {
816                        stage: HandshakeStage::OpenConnectionRequest1,
817                    });
818                }
819
820                let req1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
821                    protocol_version: self.config.protocol_version,
822                    mtu,
823                    magic: DEFAULT_UNCONNECTED_MAGIC,
824                });
825                self.send_offline_packet(&req1).await?;
826
827                let per_attempt_deadline = std::cmp::min(
828                    overall_deadline,
829                    Instant::now() + self.config.mtu_probe_wait_per_attempt,
830                );
831
832                if let Some(reply) = self
833                    .wait_for_open_connection_reply1_in_window(per_attempt_deadline)
834                    .await?
835                {
836                    return Ok(reply);
837                }
838            }
839        }
840
841        warn!(
842            server_addr = %self.server_addr,
843            stage = ?HandshakeStage::OpenConnectionRequest1,
844            "client handshake timed out after mtu probing"
845        );
846        Err(RaknetClientError::HandshakeTimeout {
847            stage: HandshakeStage::OpenConnectionRequest1,
848        })
849    }
850
851    fn mtu_probe_candidates(&self) -> Vec<u16> {
852        let mut out = Vec::new();
853        for candidate in self
854            .config
855            .mtu_probe_order
856            .iter()
857            .copied()
858            .chain([self.config.mtu])
859        {
860            let mtu = candidate
861                .clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE)
862                .max(MINIMUM_MTU_SIZE);
863            if !out.contains(&mtu) {
864                out.push(mtu);
865            }
866        }
867
868        if out.is_empty() {
869            out.push(self.config.mtu);
870        }
871
872        out
873    }
874
875    async fn wait_for_open_connection_reply1_in_window(
876        &mut self,
877        deadline: Instant,
878    ) -> ClientResult<Option<OpenConnectionReply1>> {
879        loop {
880            let packet = match self.recv_packet_until(deadline).await? {
881                Some(packet) => packet,
882                None => return Ok(None),
883            };
884
885            let mut src = &packet[..];
886            let Ok(offline) = OfflinePacket::decode(&mut src) else {
887                continue;
888            };
889
890            if let Some(reason) = offline_rejection_reason(&offline) {
891                return Err(RaknetClientError::OfflineRejected { reason });
892            }
893
894            if let OfflinePacket::OpenConnectionReply1(reply) = offline {
895                return Ok(Some(reply));
896            }
897        }
898    }
899
900    async fn wait_for_open_connection_reply2(
901        &mut self,
902        deadline: Instant,
903    ) -> ClientResult<OpenConnectionReply2> {
904        loop {
905            let packet = match self.recv_packet_until(deadline).await? {
906                Some(packet) => packet,
907                None => {
908                    warn!(
909                        server_addr = %self.server_addr,
910                        stage = ?HandshakeStage::OpenConnectionRequest2,
911                        "client handshake timed out waiting for reply2"
912                    );
913                    return Err(RaknetClientError::HandshakeTimeout {
914                        stage: HandshakeStage::OpenConnectionRequest2,
915                    });
916                }
917            };
918
919            let mut src = &packet[..];
920            let Ok(offline) = OfflinePacket::decode(&mut src) else {
921                continue;
922            };
923
924            if let Some(reason) = offline_rejection_reason(&offline) {
925                return Err(RaknetClientError::OfflineRejected { reason });
926            }
927
928            if let OfflinePacket::OpenConnectionReply2(reply) = offline {
929                return Ok(reply);
930            }
931        }
932    }
933
934    async fn wait_for_connection_request_accepted(
935        &mut self,
936        deadline: Instant,
937    ) -> ClientResult<ConnectionRequestAccepted> {
938        loop {
939            let packet = match self.recv_packet_until(deadline).await? {
940                Some(packet) => packet,
941                None => {
942                    warn!(
943                        server_addr = %self.server_addr,
944                        stage = ?HandshakeStage::ConnectionRequestAccepted,
945                        "client handshake timed out waiting for request accepted"
946                    );
947                    return Err(RaknetClientError::HandshakeTimeout {
948                        stage: HandshakeStage::ConnectionRequestAccepted,
949                    });
950                }
951            };
952
953            let mut offline_src = &packet[..];
954            if let Ok(offline) = OfflinePacket::decode(&mut offline_src) {
955                if let Some(reason) = offline_rejection_reason(&offline) {
956                    return Err(RaknetClientError::OfflineRejected { reason });
957                }
958                continue;
959            }
960
961            let mut src = &packet[..];
962            let datagram = match Datagram::decode(&mut src) {
963                Ok(datagram) => datagram,
964                Err(_) => continue,
965            };
966
967            if let Some(accepted) = self.process_handshake_datagram(datagram).await? {
968                return Ok(accepted);
969            }
970        }
971    }
972
973    async fn process_handshake_datagram(
974        &mut self,
975        datagram: Datagram,
976    ) -> ClientResult<Option<ConnectionRequestAccepted>> {
977        let now = Instant::now();
978        let frames = self
979            .session
980            .ingest_datagram(datagram, now)
981            .map_err(invalid_data_client_error)?;
982        let _ = self.session.process_incoming_receipts(now);
983
984        let mut accepted = None;
985        for frame in frames {
986            let Some(first) = frame.payload.first().copied() else {
987                continue;
988            };
989            if !is_connected_control_id(first) {
990                continue;
991            }
992
993            let mut control_payload = &frame.payload[..];
994            let control =
995                ConnectedControlPacket::decode(&mut control_payload).map_err(|error| {
996                    RaknetClientError::HandshakeProtocolViolation {
997                        details: format!("failed to decode handshake control packet: {error}"),
998                    }
999                })?;
1000
1001            match control {
1002                ConnectedControlPacket::ConnectionRequestAccepted(pkt) => {
1003                    accepted = Some(pkt);
1004                }
1005                ConnectedControlPacket::ConnectedPing(ping) => {
1006                    self.queue_connected_control_packet(
1007                        ConnectedControlPacket::ConnectedPong(ConnectedPong {
1008                            ping_time: ping.ping_time,
1009                            pong_time: unix_timestamp_millis(),
1010                        }),
1011                        Reliability::Unreliable,
1012                        0,
1013                        RakPriority::Immediate,
1014                    )?;
1015                }
1016                ConnectedControlPacket::DisconnectionNotification(pkt) => {
1017                    return Err(RaknetClientError::Closed {
1018                        reason: ClientDisconnectReason::RemoteDisconnectionNotification {
1019                            reason_code: pkt.reason,
1020                        },
1021                    });
1022                }
1023                ConnectedControlPacket::DetectLostConnection(_) => {
1024                    return Err(RaknetClientError::Closed {
1025                        reason: ClientDisconnectReason::RemoteDetectLostConnection,
1026                    });
1027                }
1028                ConnectedControlPacket::ConnectedPong(_)
1029                | ConnectedControlPacket::ConnectionRequest(_)
1030                | ConnectedControlPacket::NewIncomingConnection(_) => {}
1031            }
1032        }
1033
1034        self.flush_outbound_with_limits(
1035            self.config.max_new_datagrams_per_recv,
1036            self.config.max_new_bytes_per_recv,
1037            self.config.max_resend_datagrams_per_recv,
1038            self.config.max_resend_bytes_per_recv,
1039        )
1040        .await?;
1041
1042        Ok(accepted)
1043    }
1044
1045    async fn recv_packet_until(&mut self, deadline: Instant) -> ClientResult<Option<Vec<u8>>> {
1046        loop {
1047            let remaining = deadline.saturating_duration_since(Instant::now());
1048            if remaining.is_zero() {
1049                return Ok(None);
1050            }
1051
1052            let recv = match time::timeout(remaining, self.socket.recv_from(&mut self.recv_buffer))
1053                .await
1054            {
1055                Ok(result) => result,
1056                Err(_) => return Ok(None),
1057            }
1058            .map_err(RaknetClientError::from)?;
1059
1060            let (len, addr) = recv;
1061            if addr != self.server_addr {
1062                continue;
1063            }
1064
1065            let mut packet = Vec::with_capacity(len);
1066            packet.extend_from_slice(&self.recv_buffer[..len]);
1067            return Ok(Some(packet));
1068        }
1069    }
1070
1071    async fn process_inbound_packet(&mut self, len: usize) -> ClientResult<()> {
1072        let payload = &self.recv_buffer[..len];
1073        let Some(first) = payload.first().copied() else {
1074            return Ok(());
1075        };
1076
1077        if is_offline_packet_id(first) {
1078            self.pending_events
1079                .push_back(RaknetClientEvent::DecodeError {
1080                    error: format!("unexpected offline packet id while connected: 0x{first:02x}"),
1081                });
1082            return Ok(());
1083        }
1084
1085        let mut src = payload;
1086        let datagram = match Datagram::decode(&mut src) {
1087            Ok(datagram) => datagram,
1088            Err(error) => {
1089                self.pending_events
1090                    .push_back(RaknetClientEvent::DecodeError {
1091                        error: error.to_string(),
1092                    });
1093                return Ok(());
1094            }
1095        };
1096
1097        self.process_connected_datagram(datagram).await
1098    }
1099
1100    async fn process_connected_datagram(&mut self, datagram: Datagram) -> ClientResult<()> {
1101        let now = Instant::now();
1102        let frames = match self.session.ingest_datagram(datagram, now) {
1103            Ok(frames) => frames,
1104            Err(error) => {
1105                self.pending_events
1106                    .push_back(RaknetClientEvent::DecodeError {
1107                        error: error.to_string(),
1108                    });
1109                return Ok(());
1110            }
1111        };
1112
1113        let receipts = self.session.process_incoming_receipts(now);
1114        for receipt_id in receipts.acked_receipt_ids {
1115            self.pending_events
1116                .push_back(RaknetClientEvent::ReceiptAcked { receipt_id });
1117        }
1118
1119        for frame in frames {
1120            let Some(first) = frame.payload.first().copied() else {
1121                continue;
1122            };
1123
1124            if is_connected_control_id(first) {
1125                let mut control_payload = &frame.payload[..];
1126                let control = match ConnectedControlPacket::decode(&mut control_payload) {
1127                    Ok(control) => control,
1128                    Err(error) => {
1129                        self.pending_events
1130                            .push_back(RaknetClientEvent::DecodeError {
1131                                error: error.to_string(),
1132                            });
1133                        continue;
1134                    }
1135                };
1136
1137                self.apply_connected_control(control)?;
1138                continue;
1139            }
1140
1141            self.pending_events.push_back(RaknetClientEvent::Packet {
1142                payload: frame.payload,
1143                reliability: frame.header.reliability,
1144                reliable_index: frame.reliable_index,
1145                sequence_index: frame.sequence_index,
1146                ordering_index: frame.ordering_index,
1147                ordering_channel: frame.ordering_channel,
1148            });
1149        }
1150
1151        // Client facade is poll-driven; make sure control ACK/NACK can flush
1152        // immediately on this receive path without waiting for a later poll tick.
1153        self.session.force_control_flush_deadlines(now);
1154
1155        self.flush_outbound_with_limits(
1156            self.config.max_new_datagrams_per_recv,
1157            self.config.max_new_bytes_per_recv,
1158            self.config.max_resend_datagrams_per_recv,
1159            self.config.max_resend_bytes_per_recv,
1160        )
1161        .await
1162    }
1163
1164    fn apply_connected_control(&mut self, control: ConnectedControlPacket) -> ClientResult<()> {
1165        match control {
1166            ConnectedControlPacket::ConnectedPing(ping) => {
1167                self.queue_connected_control_packet(
1168                    ConnectedControlPacket::ConnectedPong(ConnectedPong {
1169                        ping_time: ping.ping_time,
1170                        pong_time: unix_timestamp_millis(),
1171                    }),
1172                    Reliability::Unreliable,
1173                    0,
1174                    RakPriority::Immediate,
1175                )?;
1176            }
1177            ConnectedControlPacket::DisconnectionNotification(pkt) => {
1178                self.finish_close(ClientDisconnectReason::RemoteDisconnectionNotification {
1179                    reason_code: pkt.reason,
1180                });
1181            }
1182            ConnectedControlPacket::DetectLostConnection(DetectLostConnection) => {
1183                self.finish_close(ClientDisconnectReason::RemoteDetectLostConnection);
1184            }
1185            ConnectedControlPacket::ConnectionRequest(_)
1186            | ConnectedControlPacket::ConnectionRequestAccepted(_)
1187            | ConnectedControlPacket::NewIncomingConnection(_)
1188            | ConnectedControlPacket::ConnectedPong(_) => {}
1189        }
1190
1191        Ok(())
1192    }
1193
1194    fn queue_connected_control_packet(
1195        &mut self,
1196        packet: ConnectedControlPacket,
1197        reliability: Reliability,
1198        channel: u8,
1199        priority: RakPriority,
1200    ) -> ClientResult<()> {
1201        let mut out = BytesMut::new();
1202        packet.encode(&mut out).map_err(invalid_data_client_error)?;
1203        self.queue_payload_with_optional_receipt(out.freeze(), reliability, channel, priority, None)
1204    }
1205
1206    fn queue_payload_with_optional_receipt(
1207        &mut self,
1208        payload: Bytes,
1209        reliability: Reliability,
1210        channel: u8,
1211        priority: RakPriority,
1212        receipt_id: Option<u64>,
1213    ) -> ClientResult<()> {
1214        let decision = if let Some(receipt_id) = receipt_id {
1215            self.session.queue_payload_with_receipt(
1216                payload,
1217                reliability,
1218                channel,
1219                priority,
1220                Some(receipt_id),
1221            )
1222        } else {
1223            self.session
1224                .queue_payload(payload, reliability, channel, priority)
1225        };
1226
1227        match decision {
1228            QueuePayloadResult::Enqueued { .. } => Ok(()),
1229            QueuePayloadResult::Dropped => Err(RaknetClientError::BackpressureDropped),
1230            QueuePayloadResult::Deferred => Err(RaknetClientError::BackpressureDeferred),
1231            QueuePayloadResult::DisconnectRequested => {
1232                self.finish_close(ClientDisconnectReason::Backpressure);
1233                Err(RaknetClientError::BackpressureDisconnect)
1234            }
1235        }
1236    }
1237
1238    async fn flush_outbound_with_limits(
1239        &mut self,
1240        max_new_datagrams: usize,
1241        max_new_bytes: usize,
1242        max_resend_datagrams: usize,
1243        max_resend_bytes: usize,
1244    ) -> ClientResult<()> {
1245        if self.closed {
1246            return Ok(());
1247        }
1248
1249        self.queue_keepalive_ping();
1250
1251        let now = Instant::now();
1252        let datagrams = self.session.on_tick(
1253            now,
1254            max_new_datagrams,
1255            max_new_bytes,
1256            max_resend_datagrams,
1257            max_resend_bytes,
1258        );
1259
1260        for datagram in &datagrams {
1261            self.send_datagram(datagram).await?;
1262        }
1263
1264        if self.session.take_backpressure_disconnect() {
1265            self.finish_close(ClientDisconnectReason::Backpressure);
1266            return Err(RaknetClientError::BackpressureDisconnect);
1267        }
1268
1269        Ok(())
1270    }
1271
1272    fn queue_keepalive_ping(&mut self) {
1273        let now = Instant::now();
1274        if !self
1275            .session
1276            .should_send_keepalive(now, self.config.session_keepalive_interval)
1277        {
1278            return;
1279        }
1280
1281        let ping = ConnectedControlPacket::ConnectedPing(ConnectedPing {
1282            ping_time: unix_timestamp_millis(),
1283        });
1284
1285        let decision = {
1286            let mut out = BytesMut::new();
1287            if ping.encode(&mut out).is_err() {
1288                return;
1289            }
1290            self.session
1291                .queue_payload(out.freeze(), Reliability::Unreliable, 0, RakPriority::Low)
1292        };
1293
1294        if matches!(decision, QueuePayloadResult::Enqueued { .. }) {
1295            self.session.mark_keepalive_sent(now);
1296        }
1297    }
1298
1299    fn check_idle_timeout_and_close(&mut self) -> bool {
1300        if self.closed
1301            || self.config.session_idle_timeout.is_zero()
1302            || self.session.state() != SessionState::Connected
1303        {
1304            return false;
1305        }
1306
1307        let idle = Instant::now().saturating_duration_since(self.last_inbound_activity);
1308        if idle >= self.config.session_idle_timeout {
1309            self.finish_close(ClientDisconnectReason::IdleTimeout);
1310            return true;
1311        }
1312
1313        false
1314    }
1315
1316    async fn send_offline_packet(&self, packet: &OfflinePacket) -> ClientResult<()> {
1317        let mut out = BytesMut::new();
1318        packet.encode(&mut out).map_err(invalid_data_client_error)?;
1319        let _written = self
1320            .socket
1321            .send_to(&out, self.server_addr)
1322            .await
1323            .map_err(RaknetClientError::from)?;
1324        Ok(())
1325    }
1326
1327    async fn send_datagram(&self, datagram: &Datagram) -> ClientResult<()> {
1328        let mut out = BytesMut::with_capacity(datagram.encoded_size());
1329        datagram
1330            .encode(&mut out)
1331            .map_err(invalid_data_client_error)?;
1332        let _written = self
1333            .socket
1334            .send_to(&out, self.server_addr)
1335            .await
1336            .map_err(RaknetClientError::from)?;
1337        Ok(())
1338    }
1339
1340    fn ensure_open(&self) -> ClientResult<()> {
1341        if self.closed {
1342            return Err(RaknetClientError::Closed {
1343                reason: self.close_reason.clone().unwrap_or(
1344                    ClientDisconnectReason::TransportError {
1345                        message: "closed without explicit reason".to_string(),
1346                    },
1347                ),
1348            });
1349        }
1350        Ok(())
1351    }
1352
1353    fn finish_close(&mut self, reason: ClientDisconnectReason) {
1354        if self.closed {
1355            return;
1356        }
1357
1358        if self.session.state() == SessionState::Connected {
1359            let _ = self.session.transition_to(SessionState::Closing);
1360        }
1361        let _ = self.session.transition_to(SessionState::Closed);
1362
1363        self.close_reason = Some(reason.clone());
1364        self.closed = true;
1365        match &reason {
1366            ClientDisconnectReason::Requested
1367            | ClientDisconnectReason::RemoteDisconnectionNotification { .. }
1368            | ClientDisconnectReason::RemoteDetectLostConnection => {
1369                info!(server_addr = %self.server_addr, ?reason, "client closed")
1370            }
1371            ClientDisconnectReason::Backpressure
1372            | ClientDisconnectReason::IdleTimeout
1373            | ClientDisconnectReason::TransportError { .. } => {
1374                warn!(server_addr = %self.server_addr, ?reason, "client closed")
1375            }
1376        }
1377        self.pending_events
1378            .push_back(RaknetClientEvent::Disconnected { reason });
1379    }
1380}
1381
1382fn should_retry_connect(error: &RaknetClientError, policy: &ReconnectPolicy) -> bool {
1383    match error {
1384        RaknetClientError::OfflineRejected { .. } => !policy.fast_fail_on_offline_rejection,
1385        RaknetClientError::HandshakeTimeout { .. } => policy.retry_on_handshake_timeout,
1386        RaknetClientError::Io { .. } => policy.retry_on_io,
1387        RaknetClientError::InvalidConfig { .. } => false,
1388        RaknetClientError::HandshakeProtocolViolation { .. }
1389        | RaknetClientError::Closed { .. }
1390        | RaknetClientError::BackpressureDropped
1391        | RaknetClientError::BackpressureDeferred
1392        | RaknetClientError::BackpressureDisconnect => false,
1393    }
1394}
1395
1396fn next_backoff(current: Duration, policy: &ReconnectPolicy) -> Duration {
1397    if current.is_zero() {
1398        return Duration::from_millis(1).min(policy.max_backoff);
1399    }
1400
1401    let doubled = current.saturating_mul(2);
1402    if doubled > policy.max_backoff {
1403        policy.max_backoff
1404    } else {
1405        doubled
1406    }
1407}
1408
1409fn invalid_data_client_error<E: std::fmt::Display>(error: E) -> RaknetClientError {
1410    RaknetClientError::HandshakeProtocolViolation {
1411        details: error.to_string(),
1412    }
1413}
1414
1415fn offline_rejection_reason(packet: &OfflinePacket) -> Option<OfflineRejectionReason> {
1416    match packet {
1417        OfflinePacket::IncompatibleProtocolVersion(pkt) => {
1418            Some(OfflineRejectionReason::IncompatibleProtocolVersion {
1419                protocol_version: pkt.protocol_version,
1420                server_guid: pkt.server_guid,
1421            })
1422        }
1423        OfflinePacket::ConnectionRequestFailed(pkt) => {
1424            Some(OfflineRejectionReason::ConnectionRequestFailed {
1425                server_guid: pkt.server_guid,
1426            })
1427        }
1428        OfflinePacket::AlreadyConnected(pkt) => Some(OfflineRejectionReason::AlreadyConnected {
1429            server_guid: pkt.server_guid,
1430        }),
1431        OfflinePacket::NoFreeIncomingConnections(pkt) => {
1432            Some(OfflineRejectionReason::NoFreeIncomingConnections {
1433                server_guid: pkt.server_guid,
1434            })
1435        }
1436        OfflinePacket::ConnectionBanned(pkt) => Some(OfflineRejectionReason::ConnectionBanned {
1437            server_guid: pkt.server_guid,
1438        }),
1439        OfflinePacket::IpRecentlyConnected(pkt) => {
1440            Some(OfflineRejectionReason::IpRecentlyConnected {
1441                server_guid: pkt.server_guid,
1442            })
1443        }
1444        _ => None,
1445    }
1446}
1447
1448fn is_offline_packet_id(id: u8) -> bool {
1449    matches!(
1450        id,
1451        0x01 | 0x02 | 0x05 | 0x06 | 0x07 | 0x08 | 0x11 | 0x12 | 0x14 | 0x17 | 0x19 | 0x1A | 0x1C
1452    )
1453}
1454
1455fn is_connected_control_id(id: u8) -> bool {
1456    matches!(id, 0x00 | 0x03 | 0x04 | 0x09 | 0x10 | 0x13 | 0x15)
1457}
1458
1459fn default_bind_addr_for_server(server_addr: SocketAddr) -> SocketAddr {
1460    match server_addr {
1461        SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1462        SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1463            Ipv6Addr::UNSPECIFIED,
1464            0,
1465            0,
1466            v6.scope_id(),
1467        )),
1468    }
1469}
1470
1471fn build_internal_addrs(server_addr: SocketAddr) -> [SocketAddr; SYSTEM_ADDRESS_COUNT] {
1472    let fallback = match server_addr {
1473        SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1474        SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1475            Ipv6Addr::UNSPECIFIED,
1476            0,
1477            0,
1478            v6.scope_id(),
1479        )),
1480    };
1481
1482    let mut addrs = [fallback; SYSTEM_ADDRESS_COUNT];
1483    addrs[0] = server_addr;
1484    addrs
1485}
1486
1487fn random_guid() -> u64 {
1488    let mut bytes = [0u8; 8];
1489    if getrandom::getrandom(&mut bytes).is_ok() {
1490        return u64::from_le_bytes(bytes);
1491    }
1492
1493    let now = unix_timestamp_millis() as u64;
1494    now ^ 0xA5A5_5A5A_DEAD_BEEF
1495}
1496
1497fn unix_timestamp_millis() -> i64 {
1498    match SystemTime::now().duration_since(UNIX_EPOCH) {
1499        Ok(duration) => duration.as_millis().min(i64::MAX as u128) as i64,
1500        Err(_) => 0,
1501    }
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506    use super::{
1507        ClientSendOptions, OfflineRejectionReason, RaknetClientConfig, RaknetClientError,
1508        ReconnectPolicy, is_connected_control_id, is_offline_packet_id, next_backoff,
1509        offline_rejection_reason, should_retry_connect,
1510    };
1511    use crate::handshake::{ConnectionBanned, OfflinePacket};
1512    use crate::protocol::reliability::Reliability;
1513    use crate::session::RakPriority;
1514    use std::time::Duration;
1515
1516    #[test]
1517    fn id_guards_cover_known_ranges() {
1518        assert!(is_offline_packet_id(0x05));
1519        assert!(is_offline_packet_id(0x1C));
1520        assert!(!is_offline_packet_id(0xFF));
1521
1522        assert!(is_connected_control_id(0x10));
1523        assert!(!is_connected_control_id(0x11));
1524    }
1525
1526    #[test]
1527    fn send_options_default_matches_server_defaults() {
1528        let options = ClientSendOptions::default();
1529        assert_eq!(options.reliability, Reliability::ReliableOrdered);
1530        assert_eq!(options.channel, 0);
1531        assert_eq!(options.priority, RakPriority::High);
1532    }
1533
1534    #[test]
1535    fn rejection_mapping_extracts_reason() {
1536        let packet = OfflinePacket::ConnectionBanned(ConnectionBanned {
1537            server_guid: 7,
1538            magic: crate::protocol::constants::DEFAULT_UNCONNECTED_MAGIC,
1539        });
1540        assert_eq!(
1541            offline_rejection_reason(&packet),
1542            Some(OfflineRejectionReason::ConnectionBanned { server_guid: 7 })
1543        );
1544    }
1545
1546    #[test]
1547    fn retry_policy_fast_fail_respects_offline_rejection() {
1548        let err = RaknetClientError::OfflineRejected {
1549            reason: OfflineRejectionReason::ConnectionBanned { server_guid: 1 },
1550        };
1551        let policy = ReconnectPolicy::default();
1552        assert!(!should_retry_connect(&err, &policy));
1553
1554        let mut relaxed = policy;
1555        relaxed.fast_fail_on_offline_rejection = false;
1556        assert!(should_retry_connect(&err, &relaxed));
1557    }
1558
1559    #[test]
1560    fn backoff_growth_respects_cap() {
1561        let policy = ReconnectPolicy {
1562            initial_backoff: Duration::from_millis(100),
1563            max_backoff: Duration::from_millis(250),
1564            ..ReconnectPolicy::default()
1565        };
1566        assert_eq!(
1567            next_backoff(Duration::from_millis(100), &policy),
1568            Duration::from_millis(200)
1569        );
1570        assert_eq!(
1571            next_backoff(Duration::from_millis(200), &policy),
1572            Duration::from_millis(250)
1573        );
1574    }
1575
1576    #[test]
1577    fn mtu_probe_candidates_include_configured_mtu_even_when_order_empty() {
1578        let cfg = RaknetClientConfig {
1579            mtu: 1300,
1580            mtu_probe_order: Vec::new(),
1581            ..RaknetClientConfig::default()
1582        };
1583        assert_eq!(cfg.mtu, 1300);
1584        cfg.validate().expect("config should be valid");
1585    }
1586
1587    #[test]
1588    fn client_config_validate_rejects_out_of_range_mtu() {
1589        let cfg = RaknetClientConfig {
1590            mtu: 10,
1591            ..RaknetClientConfig::default()
1592        };
1593        let err = cfg
1594            .validate()
1595            .expect_err("invalid MTU must be rejected by validate()");
1596        assert_eq!(err.config, "RaknetClientConfig");
1597        assert_eq!(err.field, "mtu");
1598    }
1599
1600    #[test]
1601    fn reconnect_policy_validate_rejects_zero_attempts() {
1602        let policy = ReconnectPolicy {
1603            max_attempts: 0,
1604            ..ReconnectPolicy::default()
1605        };
1606        let err = policy
1607            .validate()
1608            .expect_err("max_attempts=0 must be rejected");
1609        assert_eq!(err.config, "ReconnectPolicy");
1610        assert_eq!(err.field, "max_attempts");
1611    }
1612}