Skip to main content

raknet_rust/
client.rs

1use std::collections::VecDeque;
2use std::io;
3use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use bytes::{Bytes, BytesMut};
7use thiserror::Error;
8use tokio::net::UdpSocket;
9use tokio::time::{self, sleep};
10use tracing::{debug, info, warn};
11
12use crate::error::ConfigValidationError;
13use crate::handshake::{
14    OfflinePacket, OpenConnectionReply1, OpenConnectionReply2, OpenConnectionRequest1,
15    OpenConnectionRequest2, Request2ParsePath,
16};
17use crate::protocol::connected::{
18    ConnectedControlPacket, ConnectedPing, ConnectedPong, ConnectionRequest,
19    ConnectionRequestAccepted, DetectLostConnection, DisconnectionNotification,
20    NewIncomingConnection, SYSTEM_ADDRESS_COUNT,
21};
22use crate::protocol::constants::{
23    DEFAULT_UNCONNECTED_MAGIC, MAXIMUM_MTU_SIZE, MINIMUM_MTU_SIZE, MTU_PROBE_ORDER,
24    RAKNET_PROTOCOL_VERSION,
25};
26use crate::protocol::datagram::Datagram;
27use crate::protocol::reliability::Reliability;
28use crate::protocol::sequence24::Sequence24;
29use crate::session::{
30    QueuePayloadResult, RakPriority, Session, SessionMetricsSnapshot, SessionState,
31};
32
33pub type ClientResult<T> = Result<T, RaknetClientError>;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct ClientSendOptions {
37    pub reliability: Reliability,
38    pub channel: u8,
39    pub priority: RakPriority,
40}
41
42impl Default for ClientSendOptions {
43    fn default() -> Self {
44        Self {
45            reliability: Reliability::ReliableOrdered,
46            channel: 0,
47            priority: RakPriority::High,
48        }
49    }
50}
51
52#[derive(Debug, Clone)]
53pub struct RaknetClientConfig {
54    pub local_addr: Option<SocketAddr>,
55    pub guid: u64,
56    pub protocol_version: u8,
57    pub mtu: u16,
58    pub mtu_probe_order: Vec<u16>,
59    pub mtu_probe_attempts_per_step: usize,
60    pub mtu_probe_wait_per_attempt: Duration,
61    pub handshake_timeout: Duration,
62    pub outbound_tick_interval: Duration,
63    pub session_keepalive_interval: Duration,
64    pub session_idle_timeout: Duration,
65    pub recv_buffer_capacity: usize,
66    pub max_new_datagrams_per_tick: usize,
67    pub max_new_bytes_per_tick: usize,
68    pub max_resend_datagrams_per_tick: usize,
69    pub max_resend_bytes_per_tick: usize,
70    pub max_new_datagrams_per_recv: usize,
71    pub max_new_bytes_per_recv: usize,
72    pub max_resend_datagrams_per_recv: usize,
73    pub max_resend_bytes_per_recv: usize,
74}
75
76impl Default for RaknetClientConfig {
77    fn default() -> Self {
78        Self {
79            local_addr: None,
80            guid: random_guid(),
81            protocol_version: RAKNET_PROTOCOL_VERSION,
82            mtu: 1200,
83            mtu_probe_order: MTU_PROBE_ORDER.to_vec(),
84            mtu_probe_attempts_per_step: 2,
85            mtu_probe_wait_per_attempt: Duration::from_millis(350),
86            handshake_timeout: Duration::from_secs(5),
87            outbound_tick_interval: Duration::from_millis(10),
88            session_keepalive_interval: Duration::from_secs(10),
89            session_idle_timeout: Duration::from_secs(30),
90            recv_buffer_capacity: MAXIMUM_MTU_SIZE as usize,
91            max_new_datagrams_per_tick: 8,
92            max_new_bytes_per_tick: 64 * 1024,
93            max_resend_datagrams_per_tick: 8,
94            max_resend_bytes_per_tick: 64 * 1024,
95            max_new_datagrams_per_recv: 6,
96            max_new_bytes_per_recv: 64 * 1024,
97            max_resend_datagrams_per_recv: 6,
98            max_resend_bytes_per_recv: 64 * 1024,
99        }
100    }
101}
102
103impl RaknetClientConfig {
104    pub fn validate(&self) -> Result<(), ConfigValidationError> {
105        if !(MINIMUM_MTU_SIZE..=MAXIMUM_MTU_SIZE).contains(&self.mtu) {
106            return Err(ConfigValidationError::new(
107                "RaknetClientConfig",
108                "mtu",
109                format!(
110                    "must be within [{MINIMUM_MTU_SIZE}, {MAXIMUM_MTU_SIZE}], got {}",
111                    self.mtu
112                ),
113            ));
114        }
115        if self.mtu_probe_attempts_per_step == 0 {
116            return Err(ConfigValidationError::new(
117                "RaknetClientConfig",
118                "mtu_probe_attempts_per_step",
119                "must be >= 1",
120            ));
121        }
122        if self.mtu_probe_wait_per_attempt.is_zero() {
123            return Err(ConfigValidationError::new(
124                "RaknetClientConfig",
125                "mtu_probe_wait_per_attempt",
126                "must be > 0",
127            ));
128        }
129        if self.handshake_timeout.is_zero() {
130            return Err(ConfigValidationError::new(
131                "RaknetClientConfig",
132                "handshake_timeout",
133                "must be > 0",
134            ));
135        }
136        if self.outbound_tick_interval.is_zero() {
137            return Err(ConfigValidationError::new(
138                "RaknetClientConfig",
139                "outbound_tick_interval",
140                "must be > 0",
141            ));
142        }
143        if self.session_keepalive_interval.is_zero() {
144            return Err(ConfigValidationError::new(
145                "RaknetClientConfig",
146                "session_keepalive_interval",
147                "must be > 0",
148            ));
149        }
150        if self.session_idle_timeout.is_zero() {
151            return Err(ConfigValidationError::new(
152                "RaknetClientConfig",
153                "session_idle_timeout",
154                "must be > 0",
155            ));
156        }
157        if self.recv_buffer_capacity < self.mtu as usize {
158            return Err(ConfigValidationError::new(
159                "RaknetClientConfig",
160                "recv_buffer_capacity",
161                format!(
162                    "must be >= mtu ({}), got {}",
163                    self.mtu, self.recv_buffer_capacity
164                ),
165            ));
166        }
167        if self.max_new_datagrams_per_tick == 0 {
168            return Err(ConfigValidationError::new(
169                "RaknetClientConfig",
170                "max_new_datagrams_per_tick",
171                "must be >= 1",
172            ));
173        }
174        if self.max_new_bytes_per_tick < self.mtu as usize {
175            return Err(ConfigValidationError::new(
176                "RaknetClientConfig",
177                "max_new_bytes_per_tick",
178                format!(
179                    "must be >= mtu ({}), got {}",
180                    self.mtu, self.max_new_bytes_per_tick
181                ),
182            ));
183        }
184        if self.max_resend_datagrams_per_tick == 0 {
185            return Err(ConfigValidationError::new(
186                "RaknetClientConfig",
187                "max_resend_datagrams_per_tick",
188                "must be >= 1",
189            ));
190        }
191        if self.max_resend_bytes_per_tick < self.mtu as usize {
192            return Err(ConfigValidationError::new(
193                "RaknetClientConfig",
194                "max_resend_bytes_per_tick",
195                format!(
196                    "must be >= mtu ({}), got {}",
197                    self.mtu, self.max_resend_bytes_per_tick
198                ),
199            ));
200        }
201        if self.max_new_datagrams_per_recv == 0 {
202            return Err(ConfigValidationError::new(
203                "RaknetClientConfig",
204                "max_new_datagrams_per_recv",
205                "must be >= 1",
206            ));
207        }
208        if self.max_new_bytes_per_recv < self.mtu as usize {
209            return Err(ConfigValidationError::new(
210                "RaknetClientConfig",
211                "max_new_bytes_per_recv",
212                format!(
213                    "must be >= mtu ({}), got {}",
214                    self.mtu, self.max_new_bytes_per_recv
215                ),
216            ));
217        }
218        if self.max_resend_datagrams_per_recv == 0 {
219            return Err(ConfigValidationError::new(
220                "RaknetClientConfig",
221                "max_resend_datagrams_per_recv",
222                "must be >= 1",
223            ));
224        }
225        if self.max_resend_bytes_per_recv < self.mtu as usize {
226            return Err(ConfigValidationError::new(
227                "RaknetClientConfig",
228                "max_resend_bytes_per_recv",
229                format!(
230                    "must be >= mtu ({}), got {}",
231                    self.mtu, self.max_resend_bytes_per_recv
232                ),
233            ));
234        }
235
236        Ok(())
237    }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum HandshakeStage {
242    OpenConnectionRequest1,
243    OpenConnectionRequest2,
244    ConnectionRequestAccepted,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum OfflineRejectionReason {
249    IncompatibleProtocolVersion {
250        protocol_version: u8,
251        server_guid: u64,
252    },
253    ConnectionRequestFailed {
254        server_guid: u64,
255    },
256    AlreadyConnected {
257        server_guid: u64,
258    },
259    NoFreeIncomingConnections {
260        server_guid: u64,
261    },
262    ConnectionBanned {
263        server_guid: u64,
264    },
265    IpRecentlyConnected {
266        server_guid: u64,
267    },
268}
269
270#[derive(Debug, Error, Clone, PartialEq, Eq)]
271pub enum RaknetClientError {
272    #[error("transport io error: {message}")]
273    Io { message: String },
274    #[error("invalid client config: {details}")]
275    InvalidConfig { details: String },
276    #[error("handshake timed out at stage {stage:?}")]
277    HandshakeTimeout { stage: HandshakeStage },
278    #[error("handshake rejected by server: {reason:?}")]
279    OfflineRejected { reason: OfflineRejectionReason },
280    #[error("handshake protocol violation: {details}")]
281    HandshakeProtocolViolation { details: String },
282    #[error("client closed: {reason:?}")]
283    Closed { reason: ClientDisconnectReason },
284    #[error("payload dropped by backpressure")]
285    BackpressureDropped,
286    #[error("payload deferred by backpressure")]
287    BackpressureDeferred,
288    #[error("backpressure requested disconnect")]
289    BackpressureDisconnect,
290}
291
292impl From<io::Error> for RaknetClientError {
293    fn from(value: io::Error) -> Self {
294        Self::Io {
295            message: value.to_string(),
296        }
297    }
298}
299
300impl From<RaknetClientError> for io::Error {
301    fn from(value: RaknetClientError) -> Self {
302        io::Error::other(value.to_string())
303    }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub enum ClientDisconnectReason {
308    Requested,
309    Backpressure,
310    IdleTimeout,
311    RemoteDisconnectionNotification { reason_code: Option<u8> },
312    RemoteDetectLostConnection,
313    TransportError { message: String },
314}
315
316#[derive(Debug)]
317pub enum RaknetClientEvent {
318    Connected {
319        server_addr: SocketAddr,
320        mtu: u16,
321    },
322    Packet {
323        payload: Bytes,
324        reliability: Reliability,
325        reliable_index: Option<Sequence24>,
326        sequence_index: Option<Sequence24>,
327        ordering_index: Option<Sequence24>,
328        ordering_channel: Option<u8>,
329    },
330    ReceiptAcked {
331        receipt_id: u64,
332    },
333    DecodeError {
334        error: String,
335    },
336    Disconnected {
337        reason: ClientDisconnectReason,
338    },
339}
340
341#[derive(Debug, Clone)]
342pub struct ReconnectPolicy {
343    pub max_attempts: usize,
344    pub initial_backoff: Duration,
345    pub max_backoff: Duration,
346    pub retry_on_io: bool,
347    pub retry_on_handshake_timeout: bool,
348    pub fast_fail_on_offline_rejection: bool,
349}
350
351impl Default for ReconnectPolicy {
352    fn default() -> Self {
353        Self {
354            max_attempts: 3,
355            initial_backoff: Duration::from_millis(200),
356            max_backoff: Duration::from_secs(3),
357            retry_on_io: true,
358            retry_on_handshake_timeout: true,
359            fast_fail_on_offline_rejection: true,
360        }
361    }
362}
363
364impl ReconnectPolicy {
365    pub fn validate(&self) -> Result<(), ConfigValidationError> {
366        if self.max_attempts == 0 {
367            return Err(ConfigValidationError::new(
368                "ReconnectPolicy",
369                "max_attempts",
370                "must be >= 1",
371            ));
372        }
373        if self.initial_backoff > self.max_backoff {
374            return Err(ConfigValidationError::new(
375                "ReconnectPolicy",
376                "initial_backoff",
377                format!(
378                    "must be <= max_backoff ({}ms), got {}ms",
379                    self.max_backoff.as_millis(),
380                    self.initial_backoff.as_millis()
381                ),
382            ));
383        }
384        Ok(())
385    }
386}
387
388pub struct RaknetClient {
389    socket: UdpSocket,
390    server_addr: SocketAddr,
391    session: Session,
392    config: RaknetClientConfig,
393    recv_buffer: Vec<u8>,
394    pending_events: VecDeque<RaknetClientEvent>,
395    closed: bool,
396    close_reason: Option<ClientDisconnectReason>,
397    last_inbound_activity: Instant,
398}
399
400impl RaknetClient {
401    pub async fn connect(server_addr: SocketAddr) -> ClientResult<Self> {
402        Self::connect_with_config(server_addr, RaknetClientConfig::default()).await
403    }
404
405    pub async fn connect_with_retry(
406        server_addr: SocketAddr,
407        config: RaknetClientConfig,
408        policy: ReconnectPolicy,
409    ) -> ClientResult<Self> {
410        policy
411            .validate()
412            .map_err(|error| RaknetClientError::InvalidConfig {
413                details: error.to_string(),
414            })?;
415
416        let mut attempt = 0usize;
417        let max_attempts = policy.max_attempts.max(1);
418        let mut backoff = policy.initial_backoff;
419        let mut last_error: Option<RaknetClientError> = None;
420
421        while attempt < max_attempts {
422            attempt = attempt.saturating_add(1);
423            match Self::connect_with_config(server_addr, config.clone()).await {
424                Ok(client) => return Ok(client),
425                Err(error) => {
426                    let should_retry =
427                        attempt < max_attempts && should_retry_connect(&error, &policy);
428                    last_error = Some(error);
429                    if !should_retry {
430                        break;
431                    }
432
433                    if !backoff.is_zero() {
434                        sleep(backoff).await;
435                    }
436                    backoff = next_backoff(backoff, &policy);
437                }
438            }
439        }
440
441        Err(
442            last_error.unwrap_or(RaknetClientError::HandshakeProtocolViolation {
443                details: "connect_with_retry terminated without a recorded error".to_string(),
444            }),
445        )
446    }
447
448    pub async fn connect_with_config(
449        server_addr: SocketAddr,
450        config: RaknetClientConfig,
451    ) -> ClientResult<Self> {
452        config
453            .validate()
454            .map_err(|error| RaknetClientError::InvalidConfig {
455                details: error.to_string(),
456            })?;
457
458        let local_addr = config
459            .local_addr
460            .unwrap_or_else(|| default_bind_addr_for_server(server_addr));
461        info!(%server_addr, %local_addr, "client connecting");
462        let socket = UdpSocket::bind(local_addr)
463            .await
464            .map_err(RaknetClientError::from)?;
465
466        let now = Instant::now();
467        let mut client = Self {
468            socket,
469            server_addr,
470            session: Session::new(config.mtu as usize),
471            recv_buffer: vec![0u8; config.recv_buffer_capacity],
472            config,
473            pending_events: VecDeque::new(),
474            closed: false,
475            close_reason: None,
476            last_inbound_activity: now,
477        };
478
479        client.perform_handshake().await?;
480        info!(
481            %server_addr,
482            mtu = client.session.mtu(),
483            "client handshake established"
484        );
485        client
486            .pending_events
487            .push_back(RaknetClientEvent::Connected {
488                server_addr,
489                mtu: client.session.mtu() as u16,
490            });
491
492        Ok(client)
493    }
494
495    pub fn local_addr(&self) -> io::Result<SocketAddr> {
496        self.socket.local_addr()
497    }
498
499    pub fn server_addr(&self) -> SocketAddr {
500        self.server_addr
501    }
502
503    pub fn metrics_snapshot(&self) -> SessionMetricsSnapshot {
504        self.session.metrics_snapshot()
505    }
506
507    pub async fn send(&mut self, payload: impl Into<Bytes>) -> ClientResult<()> {
508        self.send_with_options(payload, ClientSendOptions::default())
509            .await
510    }
511
512    pub async fn send_with_options(
513        &mut self,
514        payload: impl Into<Bytes>,
515        options: ClientSendOptions,
516    ) -> ClientResult<()> {
517        self.ensure_open()?;
518        self.queue_payload_with_optional_receipt(
519            payload.into(),
520            options.reliability,
521            options.channel,
522            options.priority,
523            None,
524        )?;
525        self.flush_outbound_with_limits(
526            self.config.max_new_datagrams_per_recv,
527            self.config.max_new_bytes_per_recv,
528            self.config.max_resend_datagrams_per_recv,
529            self.config.max_resend_bytes_per_recv,
530        )
531        .await
532    }
533
534    pub async fn send_with_receipt(
535        &mut self,
536        payload: impl Into<Bytes>,
537        receipt_id: u64,
538        options: ClientSendOptions,
539    ) -> ClientResult<()> {
540        self.ensure_open()?;
541        self.queue_payload_with_optional_receipt(
542            payload.into(),
543            options.reliability,
544            options.channel,
545            options.priority,
546            Some(receipt_id),
547        )?;
548        self.flush_outbound_with_limits(
549            self.config.max_new_datagrams_per_recv,
550            self.config.max_new_bytes_per_recv,
551            self.config.max_resend_datagrams_per_recv,
552            self.config.max_resend_bytes_per_recv,
553        )
554        .await
555    }
556
557    pub async fn disconnect(&mut self, reason_code: Option<u8>) -> ClientResult<()> {
558        if self.closed {
559            return Ok(());
560        }
561
562        let packet = ConnectedControlPacket::DisconnectionNotification(DisconnectionNotification {
563            reason: reason_code,
564        });
565
566        let queued = self.queue_connected_control_packet(
567            packet,
568            Reliability::ReliableOrdered,
569            0,
570            RakPriority::High,
571        );
572
573        if queued.is_ok() {
574            let _ = self
575                .flush_outbound_with_limits(
576                    self.config.max_new_datagrams_per_tick,
577                    self.config.max_new_bytes_per_tick,
578                    self.config.max_resend_datagrams_per_tick,
579                    self.config.max_resend_bytes_per_tick,
580                )
581                .await;
582        }
583
584        self.finish_close(ClientDisconnectReason::Requested);
585        queued
586    }
587
588    pub async fn next_event(&mut self) -> Option<RaknetClientEvent> {
589        if let Some(event) = self.pending_events.pop_front() {
590            return Some(event);
591        }
592        if self.closed {
593            return None;
594        }
595
596        loop {
597            if let Some(event) = self.pending_events.pop_front() {
598                return Some(event);
599            }
600            if self.closed {
601                return None;
602            }
603            if self.check_idle_timeout_and_close() {
604                continue;
605            }
606
607            match time::timeout(
608                self.config.outbound_tick_interval,
609                self.socket.recv_from(&mut self.recv_buffer),
610            )
611            .await
612            {
613                Ok(Ok((len, addr))) => {
614                    if addr != self.server_addr {
615                        continue;
616                    }
617                    self.last_inbound_activity = Instant::now();
618
619                    if let Err(error) = self.process_inbound_packet(len).await {
620                        self.finish_close(ClientDisconnectReason::TransportError {
621                            message: format!("inbound processing failed: {error}"),
622                        });
623                    }
624                }
625                Ok(Err(error)) => {
626                    self.finish_close(ClientDisconnectReason::TransportError {
627                        message: format!("udp receive failed: {error}"),
628                    });
629                }
630                Err(_) => {
631                    if let Err(error) = self
632                        .flush_outbound_with_limits(
633                            self.config.max_new_datagrams_per_tick,
634                            self.config.max_new_bytes_per_tick,
635                            self.config.max_resend_datagrams_per_tick,
636                            self.config.max_resend_bytes_per_tick,
637                        )
638                        .await
639                    {
640                        self.finish_close(ClientDisconnectReason::TransportError {
641                            message: format!("outbound tick failed: {error}"),
642                        });
643                    }
644                }
645            }
646        }
647    }
648
649    async fn perform_handshake(&mut self) -> ClientResult<()> {
650        debug!(server_addr = %self.server_addr, "starting client handshake");
651        if !self.session.transition_to(SessionState::Req1Recv) {
652            return Err(RaknetClientError::HandshakeProtocolViolation {
653                details: "session transition failed before request1".to_string(),
654            });
655        }
656
657        let deadline = Instant::now() + self.config.handshake_timeout;
658        let reply1 = self.probe_open_connection_reply1(deadline).await?;
659
660        if !self.session.transition_to(SessionState::Reply1Sent) {
661            return Err(RaknetClientError::HandshakeProtocolViolation {
662                details: "session transition failed after reply1".to_string(),
663            });
664        }
665
666        self.session
667            .set_mtu(reply1.mtu.clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE) as usize);
668
669        if !self.session.transition_to(SessionState::Req2Recv) {
670            return Err(RaknetClientError::HandshakeProtocolViolation {
671                details: "session transition failed before request2".to_string(),
672            });
673        }
674
675        let req2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
676            server_addr: self.server_addr,
677            mtu: reply1.mtu,
678            client_guid: self.config.guid,
679            cookie: reply1.cookie,
680            client_proof: false,
681            parse_path: if reply1.cookie.is_some() {
682                Request2ParsePath::StrictWithCookie
683            } else {
684                Request2ParsePath::StrictNoCookie
685            },
686            magic: DEFAULT_UNCONNECTED_MAGIC,
687        });
688        self.send_offline_packet(&req2).await?;
689
690        let _reply2 = self.wait_for_open_connection_reply2(deadline).await?;
691        if !self.session.transition_to(SessionState::Reply2Sent) {
692            return Err(RaknetClientError::HandshakeProtocolViolation {
693                details: "session transition failed after reply2".to_string(),
694            });
695        }
696
697        if !self.session.transition_to(SessionState::ConnReqRecv) {
698            return Err(RaknetClientError::HandshakeProtocolViolation {
699                details: "session transition failed before connection request".to_string(),
700            });
701        }
702
703        let request_time = unix_timestamp_millis();
704        self.queue_connected_control_packet(
705            ConnectedControlPacket::ConnectionRequest(ConnectionRequest {
706                client_guid: self.config.guid,
707                request_time,
708                use_encryption: false,
709            }),
710            Reliability::ReliableOrdered,
711            0,
712            RakPriority::High,
713        )?;
714        self.flush_outbound_with_limits(
715            self.config.max_new_datagrams_per_recv,
716            self.config.max_new_bytes_per_recv,
717            self.config.max_resend_datagrams_per_recv,
718            self.config.max_resend_bytes_per_recv,
719        )
720        .await?;
721
722        let accepted = self.wait_for_connection_request_accepted(deadline).await?;
723
724        if !self
725            .session
726            .transition_to(SessionState::ConnReqAcceptedSent)
727        {
728            return Err(RaknetClientError::HandshakeProtocolViolation {
729                details: "session transition failed after request accepted".to_string(),
730            });
731        }
732
733        if !self.session.transition_to(SessionState::NewIncomingRecv) {
734            return Err(RaknetClientError::HandshakeProtocolViolation {
735                details: "session transition failed before new incoming".to_string(),
736            });
737        }
738
739        self.queue_connected_control_packet(
740            ConnectedControlPacket::NewIncomingConnection(NewIncomingConnection {
741                server_addr: self.server_addr,
742                internal_addrs: build_internal_addrs(self.server_addr),
743                request_time: accepted.request_time,
744                accepted_time: accepted.accepted_time,
745            }),
746            Reliability::ReliableOrdered,
747            0,
748            RakPriority::High,
749        )?;
750
751        self.flush_outbound_with_limits(
752            self.config.max_new_datagrams_per_recv,
753            self.config.max_new_bytes_per_recv,
754            self.config.max_resend_datagrams_per_recv,
755            self.config.max_resend_bytes_per_recv,
756        )
757        .await?;
758
759        if !self.session.transition_to(SessionState::Connected) {
760            return Err(RaknetClientError::HandshakeProtocolViolation {
761                details: "session transition failed to connected".to_string(),
762            });
763        }
764
765        debug!(server_addr = %self.server_addr, "client handshake completed");
766        self.last_inbound_activity = Instant::now();
767        Ok(())
768    }
769
770    async fn probe_open_connection_reply1(
771        &mut self,
772        overall_deadline: Instant,
773    ) -> ClientResult<OpenConnectionReply1> {
774        let candidates = self.mtu_probe_candidates();
775        for mtu in candidates {
776            for _ in 0..self.config.mtu_probe_attempts_per_step {
777                if Instant::now() >= overall_deadline {
778                    warn!(
779                        server_addr = %self.server_addr,
780                        stage = ?HandshakeStage::OpenConnectionRequest1,
781                        "client handshake timed out"
782                    );
783                    return Err(RaknetClientError::HandshakeTimeout {
784                        stage: HandshakeStage::OpenConnectionRequest1,
785                    });
786                }
787
788                let req1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
789                    protocol_version: self.config.protocol_version,
790                    mtu,
791                    magic: DEFAULT_UNCONNECTED_MAGIC,
792                });
793                self.send_offline_packet(&req1).await?;
794
795                let per_attempt_deadline = std::cmp::min(
796                    overall_deadline,
797                    Instant::now() + self.config.mtu_probe_wait_per_attempt,
798                );
799
800                if let Some(reply) = self
801                    .wait_for_open_connection_reply1_in_window(per_attempt_deadline)
802                    .await?
803                {
804                    return Ok(reply);
805                }
806            }
807        }
808
809        warn!(
810            server_addr = %self.server_addr,
811            stage = ?HandshakeStage::OpenConnectionRequest1,
812            "client handshake timed out after mtu probing"
813        );
814        Err(RaknetClientError::HandshakeTimeout {
815            stage: HandshakeStage::OpenConnectionRequest1,
816        })
817    }
818
819    fn mtu_probe_candidates(&self) -> Vec<u16> {
820        let mut out = Vec::new();
821        for candidate in self
822            .config
823            .mtu_probe_order
824            .iter()
825            .copied()
826            .chain([self.config.mtu])
827        {
828            let mtu = candidate
829                .clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE)
830                .max(MINIMUM_MTU_SIZE);
831            if !out.contains(&mtu) {
832                out.push(mtu);
833            }
834        }
835
836        if out.is_empty() {
837            out.push(self.config.mtu);
838        }
839
840        out
841    }
842
843    async fn wait_for_open_connection_reply1_in_window(
844        &mut self,
845        deadline: Instant,
846    ) -> ClientResult<Option<OpenConnectionReply1>> {
847        loop {
848            let packet = match self.recv_packet_until(deadline).await? {
849                Some(packet) => packet,
850                None => return Ok(None),
851            };
852
853            let mut src = &packet[..];
854            let Ok(offline) = OfflinePacket::decode(&mut src) else {
855                continue;
856            };
857
858            if let Some(reason) = offline_rejection_reason(&offline) {
859                return Err(RaknetClientError::OfflineRejected { reason });
860            }
861
862            if let OfflinePacket::OpenConnectionReply1(reply) = offline {
863                return Ok(Some(reply));
864            }
865        }
866    }
867
868    async fn wait_for_open_connection_reply2(
869        &mut self,
870        deadline: Instant,
871    ) -> ClientResult<OpenConnectionReply2> {
872        loop {
873            let packet = match self.recv_packet_until(deadline).await? {
874                Some(packet) => packet,
875                None => {
876                    warn!(
877                        server_addr = %self.server_addr,
878                        stage = ?HandshakeStage::OpenConnectionRequest2,
879                        "client handshake timed out waiting for reply2"
880                    );
881                    return Err(RaknetClientError::HandshakeTimeout {
882                        stage: HandshakeStage::OpenConnectionRequest2,
883                    });
884                }
885            };
886
887            let mut src = &packet[..];
888            let Ok(offline) = OfflinePacket::decode(&mut src) else {
889                continue;
890            };
891
892            if let Some(reason) = offline_rejection_reason(&offline) {
893                return Err(RaknetClientError::OfflineRejected { reason });
894            }
895
896            if let OfflinePacket::OpenConnectionReply2(reply) = offline {
897                return Ok(reply);
898            }
899        }
900    }
901
902    async fn wait_for_connection_request_accepted(
903        &mut self,
904        deadline: Instant,
905    ) -> ClientResult<ConnectionRequestAccepted> {
906        loop {
907            let packet = match self.recv_packet_until(deadline).await? {
908                Some(packet) => packet,
909                None => {
910                    warn!(
911                        server_addr = %self.server_addr,
912                        stage = ?HandshakeStage::ConnectionRequestAccepted,
913                        "client handshake timed out waiting for request accepted"
914                    );
915                    return Err(RaknetClientError::HandshakeTimeout {
916                        stage: HandshakeStage::ConnectionRequestAccepted,
917                    });
918                }
919            };
920
921            let mut offline_src = &packet[..];
922            if let Ok(offline) = OfflinePacket::decode(&mut offline_src) {
923                if let Some(reason) = offline_rejection_reason(&offline) {
924                    return Err(RaknetClientError::OfflineRejected { reason });
925                }
926                continue;
927            }
928
929            let mut src = &packet[..];
930            let datagram = match Datagram::decode(&mut src) {
931                Ok(datagram) => datagram,
932                Err(_) => continue,
933            };
934
935            if let Some(accepted) = self.process_handshake_datagram(datagram).await? {
936                return Ok(accepted);
937            }
938        }
939    }
940
941    async fn process_handshake_datagram(
942        &mut self,
943        datagram: Datagram,
944    ) -> ClientResult<Option<ConnectionRequestAccepted>> {
945        let now = Instant::now();
946        let frames = self
947            .session
948            .ingest_datagram(datagram, now)
949            .map_err(invalid_data_client_error)?;
950        let _ = self.session.process_incoming_receipts(now);
951
952        let mut accepted = None;
953        for frame in frames {
954            let Some(first) = frame.payload.first().copied() else {
955                continue;
956            };
957            if !is_connected_control_id(first) {
958                continue;
959            }
960
961            let mut control_payload = &frame.payload[..];
962            let control =
963                ConnectedControlPacket::decode(&mut control_payload).map_err(|error| {
964                    RaknetClientError::HandshakeProtocolViolation {
965                        details: format!("failed to decode handshake control packet: {error}"),
966                    }
967                })?;
968
969            match control {
970                ConnectedControlPacket::ConnectionRequestAccepted(pkt) => {
971                    accepted = Some(pkt);
972                }
973                ConnectedControlPacket::ConnectedPing(ping) => {
974                    self.queue_connected_control_packet(
975                        ConnectedControlPacket::ConnectedPong(ConnectedPong {
976                            ping_time: ping.ping_time,
977                            pong_time: unix_timestamp_millis(),
978                        }),
979                        Reliability::Unreliable,
980                        0,
981                        RakPriority::Immediate,
982                    )?;
983                }
984                ConnectedControlPacket::DisconnectionNotification(pkt) => {
985                    return Err(RaknetClientError::Closed {
986                        reason: ClientDisconnectReason::RemoteDisconnectionNotification {
987                            reason_code: pkt.reason,
988                        },
989                    });
990                }
991                ConnectedControlPacket::DetectLostConnection(_) => {
992                    return Err(RaknetClientError::Closed {
993                        reason: ClientDisconnectReason::RemoteDetectLostConnection,
994                    });
995                }
996                ConnectedControlPacket::ConnectedPong(_)
997                | ConnectedControlPacket::ConnectionRequest(_)
998                | ConnectedControlPacket::NewIncomingConnection(_) => {}
999            }
1000        }
1001
1002        self.flush_outbound_with_limits(
1003            self.config.max_new_datagrams_per_recv,
1004            self.config.max_new_bytes_per_recv,
1005            self.config.max_resend_datagrams_per_recv,
1006            self.config.max_resend_bytes_per_recv,
1007        )
1008        .await?;
1009
1010        Ok(accepted)
1011    }
1012
1013    async fn recv_packet_until(&mut self, deadline: Instant) -> ClientResult<Option<Vec<u8>>> {
1014        loop {
1015            let remaining = deadline.saturating_duration_since(Instant::now());
1016            if remaining.is_zero() {
1017                return Ok(None);
1018            }
1019
1020            let recv = match time::timeout(remaining, self.socket.recv_from(&mut self.recv_buffer))
1021                .await
1022            {
1023                Ok(result) => result,
1024                Err(_) => return Ok(None),
1025            }
1026            .map_err(RaknetClientError::from)?;
1027
1028            let (len, addr) = recv;
1029            if addr != self.server_addr {
1030                continue;
1031            }
1032
1033            let mut packet = Vec::with_capacity(len);
1034            packet.extend_from_slice(&self.recv_buffer[..len]);
1035            return Ok(Some(packet));
1036        }
1037    }
1038
1039    async fn process_inbound_packet(&mut self, len: usize) -> ClientResult<()> {
1040        let payload = &self.recv_buffer[..len];
1041        let Some(first) = payload.first().copied() else {
1042            return Ok(());
1043        };
1044
1045        if is_offline_packet_id(first) {
1046            self.pending_events
1047                .push_back(RaknetClientEvent::DecodeError {
1048                    error: format!("unexpected offline packet id while connected: 0x{first:02x}"),
1049                });
1050            return Ok(());
1051        }
1052
1053        let mut src = payload;
1054        let datagram = match Datagram::decode(&mut src) {
1055            Ok(datagram) => datagram,
1056            Err(error) => {
1057                self.pending_events
1058                    .push_back(RaknetClientEvent::DecodeError {
1059                        error: error.to_string(),
1060                    });
1061                return Ok(());
1062            }
1063        };
1064
1065        self.process_connected_datagram(datagram).await
1066    }
1067
1068    async fn process_connected_datagram(&mut self, datagram: Datagram) -> ClientResult<()> {
1069        let now = Instant::now();
1070        let frames = match self.session.ingest_datagram(datagram, now) {
1071            Ok(frames) => frames,
1072            Err(error) => {
1073                self.pending_events
1074                    .push_back(RaknetClientEvent::DecodeError {
1075                        error: error.to_string(),
1076                    });
1077                return Ok(());
1078            }
1079        };
1080
1081        let receipts = self.session.process_incoming_receipts(now);
1082        for receipt_id in receipts.acked_receipt_ids {
1083            self.pending_events
1084                .push_back(RaknetClientEvent::ReceiptAcked { receipt_id });
1085        }
1086
1087        for frame in frames {
1088            let Some(first) = frame.payload.first().copied() else {
1089                continue;
1090            };
1091
1092            if is_connected_control_id(first) {
1093                let mut control_payload = &frame.payload[..];
1094                let control = match ConnectedControlPacket::decode(&mut control_payload) {
1095                    Ok(control) => control,
1096                    Err(error) => {
1097                        self.pending_events
1098                            .push_back(RaknetClientEvent::DecodeError {
1099                                error: error.to_string(),
1100                            });
1101                        continue;
1102                    }
1103                };
1104
1105                self.apply_connected_control(control)?;
1106                continue;
1107            }
1108
1109            self.pending_events.push_back(RaknetClientEvent::Packet {
1110                payload: frame.payload,
1111                reliability: frame.header.reliability,
1112                reliable_index: frame.reliable_index,
1113                sequence_index: frame.sequence_index,
1114                ordering_index: frame.ordering_index,
1115                ordering_channel: frame.ordering_channel,
1116            });
1117        }
1118
1119        // Client facade is poll-driven; make sure control ACK/NACK can flush
1120        // immediately on this receive path without waiting for a later poll tick.
1121        self.session.force_control_flush_deadlines(now);
1122
1123        self.flush_outbound_with_limits(
1124            self.config.max_new_datagrams_per_recv,
1125            self.config.max_new_bytes_per_recv,
1126            self.config.max_resend_datagrams_per_recv,
1127            self.config.max_resend_bytes_per_recv,
1128        )
1129        .await
1130    }
1131
1132    fn apply_connected_control(&mut self, control: ConnectedControlPacket) -> ClientResult<()> {
1133        match control {
1134            ConnectedControlPacket::ConnectedPing(ping) => {
1135                self.queue_connected_control_packet(
1136                    ConnectedControlPacket::ConnectedPong(ConnectedPong {
1137                        ping_time: ping.ping_time,
1138                        pong_time: unix_timestamp_millis(),
1139                    }),
1140                    Reliability::Unreliable,
1141                    0,
1142                    RakPriority::Immediate,
1143                )?;
1144            }
1145            ConnectedControlPacket::DisconnectionNotification(pkt) => {
1146                self.finish_close(ClientDisconnectReason::RemoteDisconnectionNotification {
1147                    reason_code: pkt.reason,
1148                });
1149            }
1150            ConnectedControlPacket::DetectLostConnection(DetectLostConnection) => {
1151                self.finish_close(ClientDisconnectReason::RemoteDetectLostConnection);
1152            }
1153            ConnectedControlPacket::ConnectionRequest(_)
1154            | ConnectedControlPacket::ConnectionRequestAccepted(_)
1155            | ConnectedControlPacket::NewIncomingConnection(_)
1156            | ConnectedControlPacket::ConnectedPong(_) => {}
1157        }
1158
1159        Ok(())
1160    }
1161
1162    fn queue_connected_control_packet(
1163        &mut self,
1164        packet: ConnectedControlPacket,
1165        reliability: Reliability,
1166        channel: u8,
1167        priority: RakPriority,
1168    ) -> ClientResult<()> {
1169        let mut out = BytesMut::new();
1170        packet.encode(&mut out).map_err(invalid_data_client_error)?;
1171        self.queue_payload_with_optional_receipt(out.freeze(), reliability, channel, priority, None)
1172    }
1173
1174    fn queue_payload_with_optional_receipt(
1175        &mut self,
1176        payload: Bytes,
1177        reliability: Reliability,
1178        channel: u8,
1179        priority: RakPriority,
1180        receipt_id: Option<u64>,
1181    ) -> ClientResult<()> {
1182        let decision = if let Some(receipt_id) = receipt_id {
1183            self.session.queue_payload_with_receipt(
1184                payload,
1185                reliability,
1186                channel,
1187                priority,
1188                Some(receipt_id),
1189            )
1190        } else {
1191            self.session
1192                .queue_payload(payload, reliability, channel, priority)
1193        };
1194
1195        match decision {
1196            QueuePayloadResult::Enqueued { .. } => Ok(()),
1197            QueuePayloadResult::Dropped => Err(RaknetClientError::BackpressureDropped),
1198            QueuePayloadResult::Deferred => Err(RaknetClientError::BackpressureDeferred),
1199            QueuePayloadResult::DisconnectRequested => {
1200                self.finish_close(ClientDisconnectReason::Backpressure);
1201                Err(RaknetClientError::BackpressureDisconnect)
1202            }
1203        }
1204    }
1205
1206    async fn flush_outbound_with_limits(
1207        &mut self,
1208        max_new_datagrams: usize,
1209        max_new_bytes: usize,
1210        max_resend_datagrams: usize,
1211        max_resend_bytes: usize,
1212    ) -> ClientResult<()> {
1213        if self.closed {
1214            return Ok(());
1215        }
1216
1217        self.queue_keepalive_ping();
1218
1219        let now = Instant::now();
1220        let datagrams = self.session.on_tick(
1221            now,
1222            max_new_datagrams,
1223            max_new_bytes,
1224            max_resend_datagrams,
1225            max_resend_bytes,
1226        );
1227
1228        for datagram in &datagrams {
1229            self.send_datagram(datagram).await?;
1230        }
1231
1232        if self.session.take_backpressure_disconnect() {
1233            self.finish_close(ClientDisconnectReason::Backpressure);
1234            return Err(RaknetClientError::BackpressureDisconnect);
1235        }
1236
1237        Ok(())
1238    }
1239
1240    fn queue_keepalive_ping(&mut self) {
1241        let now = Instant::now();
1242        if !self
1243            .session
1244            .should_send_keepalive(now, self.config.session_keepalive_interval)
1245        {
1246            return;
1247        }
1248
1249        let ping = ConnectedControlPacket::ConnectedPing(ConnectedPing {
1250            ping_time: unix_timestamp_millis(),
1251        });
1252
1253        let decision = {
1254            let mut out = BytesMut::new();
1255            if ping.encode(&mut out).is_err() {
1256                return;
1257            }
1258            self.session
1259                .queue_payload(out.freeze(), Reliability::Unreliable, 0, RakPriority::Low)
1260        };
1261
1262        if matches!(decision, QueuePayloadResult::Enqueued { .. }) {
1263            self.session.mark_keepalive_sent(now);
1264        }
1265    }
1266
1267    fn check_idle_timeout_and_close(&mut self) -> bool {
1268        if self.closed
1269            || self.config.session_idle_timeout.is_zero()
1270            || self.session.state() != SessionState::Connected
1271        {
1272            return false;
1273        }
1274
1275        let idle = Instant::now().saturating_duration_since(self.last_inbound_activity);
1276        if idle >= self.config.session_idle_timeout {
1277            self.finish_close(ClientDisconnectReason::IdleTimeout);
1278            return true;
1279        }
1280
1281        false
1282    }
1283
1284    async fn send_offline_packet(&self, packet: &OfflinePacket) -> ClientResult<()> {
1285        let mut out = BytesMut::new();
1286        packet.encode(&mut out).map_err(invalid_data_client_error)?;
1287        let _written = self
1288            .socket
1289            .send_to(&out, self.server_addr)
1290            .await
1291            .map_err(RaknetClientError::from)?;
1292        Ok(())
1293    }
1294
1295    async fn send_datagram(&self, datagram: &Datagram) -> ClientResult<()> {
1296        let mut out = BytesMut::with_capacity(datagram.encoded_size());
1297        datagram
1298            .encode(&mut out)
1299            .map_err(invalid_data_client_error)?;
1300        let _written = self
1301            .socket
1302            .send_to(&out, self.server_addr)
1303            .await
1304            .map_err(RaknetClientError::from)?;
1305        Ok(())
1306    }
1307
1308    fn ensure_open(&self) -> ClientResult<()> {
1309        if self.closed {
1310            return Err(RaknetClientError::Closed {
1311                reason: self.close_reason.clone().unwrap_or(
1312                    ClientDisconnectReason::TransportError {
1313                        message: "closed without explicit reason".to_string(),
1314                    },
1315                ),
1316            });
1317        }
1318        Ok(())
1319    }
1320
1321    fn finish_close(&mut self, reason: ClientDisconnectReason) {
1322        if self.closed {
1323            return;
1324        }
1325
1326        if self.session.state() == SessionState::Connected {
1327            let _ = self.session.transition_to(SessionState::Closing);
1328        }
1329        let _ = self.session.transition_to(SessionState::Closed);
1330
1331        self.close_reason = Some(reason.clone());
1332        self.closed = true;
1333        match &reason {
1334            ClientDisconnectReason::Requested
1335            | ClientDisconnectReason::RemoteDisconnectionNotification { .. }
1336            | ClientDisconnectReason::RemoteDetectLostConnection => {
1337                info!(server_addr = %self.server_addr, ?reason, "client closed")
1338            }
1339            ClientDisconnectReason::Backpressure
1340            | ClientDisconnectReason::IdleTimeout
1341            | ClientDisconnectReason::TransportError { .. } => {
1342                warn!(server_addr = %self.server_addr, ?reason, "client closed")
1343            }
1344        }
1345        self.pending_events
1346            .push_back(RaknetClientEvent::Disconnected { reason });
1347    }
1348}
1349
1350fn should_retry_connect(error: &RaknetClientError, policy: &ReconnectPolicy) -> bool {
1351    match error {
1352        RaknetClientError::OfflineRejected { .. } => !policy.fast_fail_on_offline_rejection,
1353        RaknetClientError::HandshakeTimeout { .. } => policy.retry_on_handshake_timeout,
1354        RaknetClientError::Io { .. } => policy.retry_on_io,
1355        RaknetClientError::InvalidConfig { .. } => false,
1356        RaknetClientError::HandshakeProtocolViolation { .. }
1357        | RaknetClientError::Closed { .. }
1358        | RaknetClientError::BackpressureDropped
1359        | RaknetClientError::BackpressureDeferred
1360        | RaknetClientError::BackpressureDisconnect => false,
1361    }
1362}
1363
1364fn next_backoff(current: Duration, policy: &ReconnectPolicy) -> Duration {
1365    if current.is_zero() {
1366        return Duration::from_millis(1).min(policy.max_backoff);
1367    }
1368
1369    let doubled = current.saturating_mul(2);
1370    if doubled > policy.max_backoff {
1371        policy.max_backoff
1372    } else {
1373        doubled
1374    }
1375}
1376
1377fn invalid_data_client_error<E: std::fmt::Display>(error: E) -> RaknetClientError {
1378    RaknetClientError::HandshakeProtocolViolation {
1379        details: error.to_string(),
1380    }
1381}
1382
1383fn offline_rejection_reason(packet: &OfflinePacket) -> Option<OfflineRejectionReason> {
1384    match packet {
1385        OfflinePacket::IncompatibleProtocolVersion(pkt) => {
1386            Some(OfflineRejectionReason::IncompatibleProtocolVersion {
1387                protocol_version: pkt.protocol_version,
1388                server_guid: pkt.server_guid,
1389            })
1390        }
1391        OfflinePacket::ConnectionRequestFailed(pkt) => {
1392            Some(OfflineRejectionReason::ConnectionRequestFailed {
1393                server_guid: pkt.server_guid,
1394            })
1395        }
1396        OfflinePacket::AlreadyConnected(pkt) => Some(OfflineRejectionReason::AlreadyConnected {
1397            server_guid: pkt.server_guid,
1398        }),
1399        OfflinePacket::NoFreeIncomingConnections(pkt) => {
1400            Some(OfflineRejectionReason::NoFreeIncomingConnections {
1401                server_guid: pkt.server_guid,
1402            })
1403        }
1404        OfflinePacket::ConnectionBanned(pkt) => Some(OfflineRejectionReason::ConnectionBanned {
1405            server_guid: pkt.server_guid,
1406        }),
1407        OfflinePacket::IpRecentlyConnected(pkt) => {
1408            Some(OfflineRejectionReason::IpRecentlyConnected {
1409                server_guid: pkt.server_guid,
1410            })
1411        }
1412        _ => None,
1413    }
1414}
1415
1416fn is_offline_packet_id(id: u8) -> bool {
1417    matches!(
1418        id,
1419        0x01 | 0x02 | 0x05 | 0x06 | 0x07 | 0x08 | 0x11 | 0x12 | 0x14 | 0x17 | 0x19 | 0x1A | 0x1C
1420    )
1421}
1422
1423fn is_connected_control_id(id: u8) -> bool {
1424    matches!(id, 0x00 | 0x03 | 0x04 | 0x09 | 0x10 | 0x13 | 0x15)
1425}
1426
1427fn default_bind_addr_for_server(server_addr: SocketAddr) -> SocketAddr {
1428    match server_addr {
1429        SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1430        SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1431            Ipv6Addr::UNSPECIFIED,
1432            0,
1433            0,
1434            v6.scope_id(),
1435        )),
1436    }
1437}
1438
1439fn build_internal_addrs(server_addr: SocketAddr) -> [SocketAddr; SYSTEM_ADDRESS_COUNT] {
1440    let fallback = match server_addr {
1441        SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1442        SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1443            Ipv6Addr::UNSPECIFIED,
1444            0,
1445            0,
1446            v6.scope_id(),
1447        )),
1448    };
1449
1450    let mut addrs = [fallback; SYSTEM_ADDRESS_COUNT];
1451    addrs[0] = server_addr;
1452    addrs
1453}
1454
1455fn random_guid() -> u64 {
1456    let mut bytes = [0u8; 8];
1457    if getrandom::getrandom(&mut bytes).is_ok() {
1458        return u64::from_le_bytes(bytes);
1459    }
1460
1461    let now = unix_timestamp_millis() as u64;
1462    now ^ 0xA5A5_5A5A_DEAD_BEEF
1463}
1464
1465fn unix_timestamp_millis() -> i64 {
1466    match SystemTime::now().duration_since(UNIX_EPOCH) {
1467        Ok(duration) => duration.as_millis().min(i64::MAX as u128) as i64,
1468        Err(_) => 0,
1469    }
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474    use super::{
1475        ClientSendOptions, OfflineRejectionReason, RaknetClientConfig, RaknetClientError,
1476        ReconnectPolicy, is_connected_control_id, is_offline_packet_id, next_backoff,
1477        offline_rejection_reason, should_retry_connect,
1478    };
1479    use crate::handshake::{ConnectionBanned, OfflinePacket};
1480    use crate::protocol::reliability::Reliability;
1481    use crate::session::RakPriority;
1482    use std::time::Duration;
1483
1484    #[test]
1485    fn id_guards_cover_known_ranges() {
1486        assert!(is_offline_packet_id(0x05));
1487        assert!(is_offline_packet_id(0x1C));
1488        assert!(!is_offline_packet_id(0xFF));
1489
1490        assert!(is_connected_control_id(0x10));
1491        assert!(!is_connected_control_id(0x11));
1492    }
1493
1494    #[test]
1495    fn send_options_default_matches_server_defaults() {
1496        let options = ClientSendOptions::default();
1497        assert_eq!(options.reliability, Reliability::ReliableOrdered);
1498        assert_eq!(options.channel, 0);
1499        assert_eq!(options.priority, RakPriority::High);
1500    }
1501
1502    #[test]
1503    fn rejection_mapping_extracts_reason() {
1504        let packet = OfflinePacket::ConnectionBanned(ConnectionBanned {
1505            server_guid: 7,
1506            magic: crate::protocol::constants::DEFAULT_UNCONNECTED_MAGIC,
1507        });
1508        assert_eq!(
1509            offline_rejection_reason(&packet),
1510            Some(OfflineRejectionReason::ConnectionBanned { server_guid: 7 })
1511        );
1512    }
1513
1514    #[test]
1515    fn retry_policy_fast_fail_respects_offline_rejection() {
1516        let err = RaknetClientError::OfflineRejected {
1517            reason: OfflineRejectionReason::ConnectionBanned { server_guid: 1 },
1518        };
1519        let policy = ReconnectPolicy::default();
1520        assert!(!should_retry_connect(&err, &policy));
1521
1522        let mut relaxed = policy;
1523        relaxed.fast_fail_on_offline_rejection = false;
1524        assert!(should_retry_connect(&err, &relaxed));
1525    }
1526
1527    #[test]
1528    fn backoff_growth_respects_cap() {
1529        let policy = ReconnectPolicy {
1530            initial_backoff: Duration::from_millis(100),
1531            max_backoff: Duration::from_millis(250),
1532            ..ReconnectPolicy::default()
1533        };
1534        assert_eq!(
1535            next_backoff(Duration::from_millis(100), &policy),
1536            Duration::from_millis(200)
1537        );
1538        assert_eq!(
1539            next_backoff(Duration::from_millis(200), &policy),
1540            Duration::from_millis(250)
1541        );
1542    }
1543
1544    #[test]
1545    fn mtu_probe_candidates_include_configured_mtu_even_when_order_empty() {
1546        let cfg = RaknetClientConfig {
1547            mtu: 1300,
1548            mtu_probe_order: Vec::new(),
1549            ..RaknetClientConfig::default()
1550        };
1551        assert_eq!(cfg.mtu, 1300);
1552        cfg.validate().expect("config should be valid");
1553    }
1554
1555    #[test]
1556    fn client_config_validate_rejects_out_of_range_mtu() {
1557        let cfg = RaknetClientConfig {
1558            mtu: 10,
1559            ..RaknetClientConfig::default()
1560        };
1561        let err = cfg
1562            .validate()
1563            .expect_err("invalid MTU must be rejected by validate()");
1564        assert_eq!(err.config, "RaknetClientConfig");
1565        assert_eq!(err.field, "mtu");
1566    }
1567
1568    #[test]
1569    fn reconnect_policy_validate_rejects_zero_attempts() {
1570        let policy = ReconnectPolicy {
1571            max_attempts: 0,
1572            ..ReconnectPolicy::default()
1573        };
1574        let err = policy
1575            .validate()
1576            .expect_err("max_attempts=0 must be rejected");
1577        assert_eq!(err.config, "ReconnectPolicy");
1578        assert_eq!(err.field, "max_attempts");
1579    }
1580}