1use std::collections::VecDeque;
7use std::io;
8use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use bytes::{Bytes, BytesMut};
12use thiserror::Error;
13use tokio::net::UdpSocket;
14use tokio::time::{self, sleep};
15use tracing::{debug, info, warn};
16
17use crate::error::ConfigValidationError;
18use crate::handshake::{
19 OfflinePacket, OpenConnectionReply1, OpenConnectionReply2, OpenConnectionRequest1,
20 OpenConnectionRequest2, Request2ParsePath,
21};
22use crate::protocol::connected::{
23 ConnectedControlPacket, ConnectedPing, ConnectedPong, ConnectionRequest,
24 ConnectionRequestAccepted, DetectLostConnection, DisconnectionNotification,
25 NewIncomingConnection, SYSTEM_ADDRESS_COUNT,
26};
27use crate::protocol::constants::{
28 DEFAULT_UNCONNECTED_MAGIC, MAXIMUM_MTU_SIZE, MINIMUM_MTU_SIZE, MTU_PROBE_ORDER,
29 RAKNET_PROTOCOL_VERSION,
30};
31use crate::protocol::datagram::Datagram;
32use crate::protocol::reliability::Reliability;
33use crate::protocol::sequence24::Sequence24;
34use crate::session::{
35 QueuePayloadResult, RakPriority, Session, SessionMetricsSnapshot, SessionState,
36};
37
38pub type ClientResult<T> = Result<T, RaknetClientError>;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct ClientSendOptions {
43 pub reliability: Reliability,
45 pub channel: u8,
47 pub priority: RakPriority,
49}
50
51impl Default for ClientSendOptions {
52 fn default() -> Self {
53 Self {
54 reliability: Reliability::ReliableOrdered,
55 channel: 0,
56 priority: RakPriority::High,
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
62pub struct RaknetClientConfig {
64 pub local_addr: Option<SocketAddr>,
65 pub guid: u64,
66 pub protocol_version: u8,
67 pub mtu: u16,
68 pub mtu_probe_order: Vec<u16>,
69 pub mtu_probe_attempts_per_step: usize,
70 pub mtu_probe_wait_per_attempt: Duration,
71 pub handshake_timeout: Duration,
72 pub outbound_tick_interval: Duration,
73 pub session_keepalive_interval: Duration,
74 pub session_idle_timeout: Duration,
75 pub recv_buffer_capacity: usize,
76 pub max_new_datagrams_per_tick: usize,
77 pub max_new_bytes_per_tick: usize,
78 pub max_resend_datagrams_per_tick: usize,
79 pub max_resend_bytes_per_tick: usize,
80 pub max_new_datagrams_per_recv: usize,
81 pub max_new_bytes_per_recv: usize,
82 pub max_resend_datagrams_per_recv: usize,
83 pub max_resend_bytes_per_recv: usize,
84}
85
86impl Default for RaknetClientConfig {
87 fn default() -> Self {
88 Self {
89 local_addr: None,
90 guid: random_guid(),
91 protocol_version: RAKNET_PROTOCOL_VERSION,
92 mtu: 1200,
93 mtu_probe_order: MTU_PROBE_ORDER.to_vec(),
94 mtu_probe_attempts_per_step: 2,
95 mtu_probe_wait_per_attempt: Duration::from_millis(350),
96 handshake_timeout: Duration::from_secs(5),
97 outbound_tick_interval: Duration::from_millis(10),
98 session_keepalive_interval: Duration::from_secs(10),
99 session_idle_timeout: Duration::from_secs(30),
100 recv_buffer_capacity: MAXIMUM_MTU_SIZE as usize,
101 max_new_datagrams_per_tick: 8,
102 max_new_bytes_per_tick: 64 * 1024,
103 max_resend_datagrams_per_tick: 8,
104 max_resend_bytes_per_tick: 64 * 1024,
105 max_new_datagrams_per_recv: 6,
106 max_new_bytes_per_recv: 64 * 1024,
107 max_resend_datagrams_per_recv: 6,
108 max_resend_bytes_per_recv: 64 * 1024,
109 }
110 }
111}
112
113impl RaknetClientConfig {
114 pub fn validate(&self) -> Result<(), ConfigValidationError> {
116 if !(MINIMUM_MTU_SIZE..=MAXIMUM_MTU_SIZE).contains(&self.mtu) {
117 return Err(ConfigValidationError::new(
118 "RaknetClientConfig",
119 "mtu",
120 format!(
121 "must be within [{MINIMUM_MTU_SIZE}, {MAXIMUM_MTU_SIZE}], got {}",
122 self.mtu
123 ),
124 ));
125 }
126 if self.mtu_probe_attempts_per_step == 0 {
127 return Err(ConfigValidationError::new(
128 "RaknetClientConfig",
129 "mtu_probe_attempts_per_step",
130 "must be >= 1",
131 ));
132 }
133 if self.mtu_probe_wait_per_attempt.is_zero() {
134 return Err(ConfigValidationError::new(
135 "RaknetClientConfig",
136 "mtu_probe_wait_per_attempt",
137 "must be > 0",
138 ));
139 }
140 if self.handshake_timeout.is_zero() {
141 return Err(ConfigValidationError::new(
142 "RaknetClientConfig",
143 "handshake_timeout",
144 "must be > 0",
145 ));
146 }
147 if self.outbound_tick_interval.is_zero() {
148 return Err(ConfigValidationError::new(
149 "RaknetClientConfig",
150 "outbound_tick_interval",
151 "must be > 0",
152 ));
153 }
154 if self.session_keepalive_interval.is_zero() {
155 return Err(ConfigValidationError::new(
156 "RaknetClientConfig",
157 "session_keepalive_interval",
158 "must be > 0",
159 ));
160 }
161 if self.session_idle_timeout.is_zero() {
162 return Err(ConfigValidationError::new(
163 "RaknetClientConfig",
164 "session_idle_timeout",
165 "must be > 0",
166 ));
167 }
168 if self.recv_buffer_capacity < self.mtu as usize {
169 return Err(ConfigValidationError::new(
170 "RaknetClientConfig",
171 "recv_buffer_capacity",
172 format!(
173 "must be >= mtu ({}), got {}",
174 self.mtu, self.recv_buffer_capacity
175 ),
176 ));
177 }
178 if self.max_new_datagrams_per_tick == 0 {
179 return Err(ConfigValidationError::new(
180 "RaknetClientConfig",
181 "max_new_datagrams_per_tick",
182 "must be >= 1",
183 ));
184 }
185 if self.max_new_bytes_per_tick < self.mtu as usize {
186 return Err(ConfigValidationError::new(
187 "RaknetClientConfig",
188 "max_new_bytes_per_tick",
189 format!(
190 "must be >= mtu ({}), got {}",
191 self.mtu, self.max_new_bytes_per_tick
192 ),
193 ));
194 }
195 if self.max_resend_datagrams_per_tick == 0 {
196 return Err(ConfigValidationError::new(
197 "RaknetClientConfig",
198 "max_resend_datagrams_per_tick",
199 "must be >= 1",
200 ));
201 }
202 if self.max_resend_bytes_per_tick < self.mtu as usize {
203 return Err(ConfigValidationError::new(
204 "RaknetClientConfig",
205 "max_resend_bytes_per_tick",
206 format!(
207 "must be >= mtu ({}), got {}",
208 self.mtu, self.max_resend_bytes_per_tick
209 ),
210 ));
211 }
212 if self.max_new_datagrams_per_recv == 0 {
213 return Err(ConfigValidationError::new(
214 "RaknetClientConfig",
215 "max_new_datagrams_per_recv",
216 "must be >= 1",
217 ));
218 }
219 if self.max_new_bytes_per_recv < self.mtu as usize {
220 return Err(ConfigValidationError::new(
221 "RaknetClientConfig",
222 "max_new_bytes_per_recv",
223 format!(
224 "must be >= mtu ({}), got {}",
225 self.mtu, self.max_new_bytes_per_recv
226 ),
227 ));
228 }
229 if self.max_resend_datagrams_per_recv == 0 {
230 return Err(ConfigValidationError::new(
231 "RaknetClientConfig",
232 "max_resend_datagrams_per_recv",
233 "must be >= 1",
234 ));
235 }
236 if self.max_resend_bytes_per_recv < self.mtu as usize {
237 return Err(ConfigValidationError::new(
238 "RaknetClientConfig",
239 "max_resend_bytes_per_recv",
240 format!(
241 "must be >= mtu ({}), got {}",
242 self.mtu, self.max_resend_bytes_per_recv
243 ),
244 ));
245 }
246
247 Ok(())
248 }
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq)]
252pub enum HandshakeStage {
254 OpenConnectionRequest1,
255 OpenConnectionRequest2,
256 ConnectionRequestAccepted,
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub enum OfflineRejectionReason {
262 IncompatibleProtocolVersion {
263 protocol_version: u8,
264 server_guid: u64,
265 },
266 ConnectionRequestFailed {
267 server_guid: u64,
268 },
269 AlreadyConnected {
270 server_guid: u64,
271 },
272 NoFreeIncomingConnections {
273 server_guid: u64,
274 },
275 ConnectionBanned {
276 server_guid: u64,
277 },
278 IpRecentlyConnected {
279 server_guid: u64,
280 },
281}
282
283#[derive(Debug, Error, Clone, PartialEq, Eq)]
284pub enum RaknetClientError {
286 #[error("transport io error: {message}")]
287 Io { message: String },
288 #[error("invalid client config: {details}")]
289 InvalidConfig { details: String },
290 #[error("handshake timed out at stage {stage:?}")]
291 HandshakeTimeout { stage: HandshakeStage },
292 #[error("handshake rejected by server: {reason:?}")]
293 OfflineRejected { reason: OfflineRejectionReason },
294 #[error("handshake protocol violation: {details}")]
295 HandshakeProtocolViolation { details: String },
296 #[error("client closed: {reason:?}")]
297 Closed { reason: ClientDisconnectReason },
298 #[error("payload dropped by backpressure")]
299 BackpressureDropped,
300 #[error("payload deferred by backpressure")]
301 BackpressureDeferred,
302 #[error("backpressure requested disconnect")]
303 BackpressureDisconnect,
304}
305
306impl From<io::Error> for RaknetClientError {
307 fn from(value: io::Error) -> Self {
308 Self::Io {
309 message: value.to_string(),
310 }
311 }
312}
313
314impl From<RaknetClientError> for io::Error {
315 fn from(value: RaknetClientError) -> Self {
316 io::Error::other(value.to_string())
317 }
318}
319
320#[derive(Debug, Clone, PartialEq, Eq)]
321pub enum ClientDisconnectReason {
323 Requested,
324 Backpressure,
325 IdleTimeout,
326 RemoteDisconnectionNotification { reason_code: Option<u8> },
327 RemoteDetectLostConnection,
328 TransportError { message: String },
329}
330
331#[derive(Debug)]
332pub enum RaknetClientEvent {
334 Connected {
335 server_addr: SocketAddr,
336 mtu: u16,
337 },
338 Packet {
339 payload: Bytes,
340 reliability: Reliability,
341 reliable_index: Option<Sequence24>,
342 sequence_index: Option<Sequence24>,
343 ordering_index: Option<Sequence24>,
344 ordering_channel: Option<u8>,
345 },
346 ReceiptAcked {
347 receipt_id: u64,
348 },
349 DecodeError {
350 error: String,
351 },
352 Disconnected {
353 reason: ClientDisconnectReason,
354 },
355}
356
357#[derive(Debug, Clone)]
358pub struct ReconnectPolicy {
360 pub max_attempts: usize,
361 pub initial_backoff: Duration,
362 pub max_backoff: Duration,
363 pub retry_on_io: bool,
364 pub retry_on_handshake_timeout: bool,
365 pub fast_fail_on_offline_rejection: bool,
366}
367
368impl Default for ReconnectPolicy {
369 fn default() -> Self {
370 Self {
371 max_attempts: 3,
372 initial_backoff: Duration::from_millis(200),
373 max_backoff: Duration::from_secs(3),
374 retry_on_io: true,
375 retry_on_handshake_timeout: true,
376 fast_fail_on_offline_rejection: true,
377 }
378 }
379}
380
381impl ReconnectPolicy {
382 pub fn validate(&self) -> Result<(), ConfigValidationError> {
384 if self.max_attempts == 0 {
385 return Err(ConfigValidationError::new(
386 "ReconnectPolicy",
387 "max_attempts",
388 "must be >= 1",
389 ));
390 }
391 if self.initial_backoff > self.max_backoff {
392 return Err(ConfigValidationError::new(
393 "ReconnectPolicy",
394 "initial_backoff",
395 format!(
396 "must be <= max_backoff ({}ms), got {}ms",
397 self.max_backoff.as_millis(),
398 self.initial_backoff.as_millis()
399 ),
400 ));
401 }
402 Ok(())
403 }
404}
405
406pub struct RaknetClient {
408 socket: UdpSocket,
409 server_addr: SocketAddr,
410 session: Session,
411 config: RaknetClientConfig,
412 recv_buffer: Vec<u8>,
413 pending_events: VecDeque<RaknetClientEvent>,
414 closed: bool,
415 close_reason: Option<ClientDisconnectReason>,
416 last_inbound_activity: Instant,
417}
418
419impl RaknetClient {
420 pub async fn connect(server_addr: SocketAddr) -> ClientResult<Self> {
422 Self::connect_with_config(server_addr, RaknetClientConfig::default()).await
423 }
424
425 pub async fn connect_with_retry(
427 server_addr: SocketAddr,
428 config: RaknetClientConfig,
429 policy: ReconnectPolicy,
430 ) -> ClientResult<Self> {
431 policy
432 .validate()
433 .map_err(|error| RaknetClientError::InvalidConfig {
434 details: error.to_string(),
435 })?;
436
437 let mut attempt = 0usize;
438 let max_attempts = policy.max_attempts.max(1);
439 let mut backoff = policy.initial_backoff;
440 let mut last_error: Option<RaknetClientError> = None;
441
442 while attempt < max_attempts {
443 attempt = attempt.saturating_add(1);
444 match Self::connect_with_config(server_addr, config.clone()).await {
445 Ok(client) => return Ok(client),
446 Err(error) => {
447 let should_retry =
448 attempt < max_attempts && should_retry_connect(&error, &policy);
449 last_error = Some(error);
450 if !should_retry {
451 break;
452 }
453
454 if !backoff.is_zero() {
455 sleep(backoff).await;
456 }
457 backoff = next_backoff(backoff, &policy);
458 }
459 }
460 }
461
462 Err(
463 last_error.unwrap_or(RaknetClientError::HandshakeProtocolViolation {
464 details: "connect_with_retry terminated without a recorded error".to_string(),
465 }),
466 )
467 }
468
469 pub async fn connect_with_config(
471 server_addr: SocketAddr,
472 config: RaknetClientConfig,
473 ) -> ClientResult<Self> {
474 config
475 .validate()
476 .map_err(|error| RaknetClientError::InvalidConfig {
477 details: error.to_string(),
478 })?;
479
480 let local_addr = config
481 .local_addr
482 .unwrap_or_else(|| default_bind_addr_for_server(server_addr));
483 info!(%server_addr, %local_addr, "client connecting");
484 let socket = UdpSocket::bind(local_addr)
485 .await
486 .map_err(RaknetClientError::from)?;
487
488 let now = Instant::now();
489 let mut client = Self {
490 socket,
491 server_addr,
492 session: Session::new(config.mtu as usize),
493 recv_buffer: vec![0u8; config.recv_buffer_capacity],
494 config,
495 pending_events: VecDeque::new(),
496 closed: false,
497 close_reason: None,
498 last_inbound_activity: now,
499 };
500
501 client.perform_handshake().await?;
502 info!(
503 %server_addr,
504 mtu = client.session.mtu(),
505 "client handshake established"
506 );
507 client
508 .pending_events
509 .push_back(RaknetClientEvent::Connected {
510 server_addr,
511 mtu: client.session.mtu() as u16,
512 });
513
514 Ok(client)
515 }
516
517 pub fn local_addr(&self) -> io::Result<SocketAddr> {
519 self.socket.local_addr()
520 }
521
522 pub fn server_addr(&self) -> SocketAddr {
524 self.server_addr
525 }
526
527 pub fn metrics_snapshot(&self) -> SessionMetricsSnapshot {
529 self.session.metrics_snapshot()
530 }
531
532 pub async fn send(&mut self, payload: impl Into<Bytes>) -> ClientResult<()> {
534 self.send_with_options(payload, ClientSendOptions::default())
535 .await
536 }
537
538 pub async fn send_with_options(
540 &mut self,
541 payload: impl Into<Bytes>,
542 options: ClientSendOptions,
543 ) -> ClientResult<()> {
544 self.ensure_open()?;
545 self.queue_payload_with_optional_receipt(
546 payload.into(),
547 options.reliability,
548 options.channel,
549 options.priority,
550 None,
551 )?;
552 self.flush_outbound_with_limits(
553 self.config.max_new_datagrams_per_recv,
554 self.config.max_new_bytes_per_recv,
555 self.config.max_resend_datagrams_per_recv,
556 self.config.max_resend_bytes_per_recv,
557 )
558 .await
559 }
560
561 pub async fn send_with_receipt(
563 &mut self,
564 payload: impl Into<Bytes>,
565 receipt_id: u64,
566 options: ClientSendOptions,
567 ) -> ClientResult<()> {
568 self.ensure_open()?;
569 self.queue_payload_with_optional_receipt(
570 payload.into(),
571 options.reliability,
572 options.channel,
573 options.priority,
574 Some(receipt_id),
575 )?;
576 self.flush_outbound_with_limits(
577 self.config.max_new_datagrams_per_recv,
578 self.config.max_new_bytes_per_recv,
579 self.config.max_resend_datagrams_per_recv,
580 self.config.max_resend_bytes_per_recv,
581 )
582 .await
583 }
584
585 pub async fn disconnect(&mut self, reason_code: Option<u8>) -> ClientResult<()> {
587 if self.closed {
588 return Ok(());
589 }
590
591 let packet = ConnectedControlPacket::DisconnectionNotification(DisconnectionNotification {
592 reason: reason_code,
593 });
594
595 let queued = self.queue_connected_control_packet(
596 packet,
597 Reliability::ReliableOrdered,
598 0,
599 RakPriority::High,
600 );
601
602 if queued.is_ok() {
603 let _ = self
604 .flush_outbound_with_limits(
605 self.config.max_new_datagrams_per_tick,
606 self.config.max_new_bytes_per_tick,
607 self.config.max_resend_datagrams_per_tick,
608 self.config.max_resend_bytes_per_tick,
609 )
610 .await;
611 }
612
613 self.finish_close(ClientDisconnectReason::Requested);
614 queued
615 }
616
617 pub async fn next_event(&mut self) -> Option<RaknetClientEvent> {
621 if let Some(event) = self.pending_events.pop_front() {
622 return Some(event);
623 }
624 if self.closed {
625 return None;
626 }
627
628 loop {
629 if let Some(event) = self.pending_events.pop_front() {
630 return Some(event);
631 }
632 if self.closed {
633 return None;
634 }
635 if self.check_idle_timeout_and_close() {
636 continue;
637 }
638
639 match time::timeout(
640 self.config.outbound_tick_interval,
641 self.socket.recv_from(&mut self.recv_buffer),
642 )
643 .await
644 {
645 Ok(Ok((len, addr))) => {
646 if addr != self.server_addr {
647 continue;
648 }
649 self.last_inbound_activity = Instant::now();
650
651 if let Err(error) = self.process_inbound_packet(len).await {
652 self.finish_close(ClientDisconnectReason::TransportError {
653 message: format!("inbound processing failed: {error}"),
654 });
655 }
656 }
657 Ok(Err(error)) => {
658 self.finish_close(ClientDisconnectReason::TransportError {
659 message: format!("udp receive failed: {error}"),
660 });
661 }
662 Err(_) => {
663 if let Err(error) = self
664 .flush_outbound_with_limits(
665 self.config.max_new_datagrams_per_tick,
666 self.config.max_new_bytes_per_tick,
667 self.config.max_resend_datagrams_per_tick,
668 self.config.max_resend_bytes_per_tick,
669 )
670 .await
671 {
672 self.finish_close(ClientDisconnectReason::TransportError {
673 message: format!("outbound tick failed: {error}"),
674 });
675 }
676 }
677 }
678 }
679 }
680
681 async fn perform_handshake(&mut self) -> ClientResult<()> {
682 debug!(server_addr = %self.server_addr, "starting client handshake");
683 if !self.session.transition_to(SessionState::Req1Recv) {
684 return Err(RaknetClientError::HandshakeProtocolViolation {
685 details: "session transition failed before request1".to_string(),
686 });
687 }
688
689 let deadline = Instant::now() + self.config.handshake_timeout;
690 let reply1 = self.probe_open_connection_reply1(deadline).await?;
691
692 if !self.session.transition_to(SessionState::Reply1Sent) {
693 return Err(RaknetClientError::HandshakeProtocolViolation {
694 details: "session transition failed after reply1".to_string(),
695 });
696 }
697
698 self.session
699 .set_mtu(reply1.mtu.clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE) as usize);
700
701 if !self.session.transition_to(SessionState::Req2Recv) {
702 return Err(RaknetClientError::HandshakeProtocolViolation {
703 details: "session transition failed before request2".to_string(),
704 });
705 }
706
707 let req2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
708 server_addr: self.server_addr,
709 mtu: reply1.mtu,
710 client_guid: self.config.guid,
711 cookie: reply1.cookie,
712 client_proof: false,
713 parse_path: if reply1.cookie.is_some() {
714 Request2ParsePath::StrictWithCookie
715 } else {
716 Request2ParsePath::StrictNoCookie
717 },
718 magic: DEFAULT_UNCONNECTED_MAGIC,
719 });
720 self.send_offline_packet(&req2).await?;
721
722 let _reply2 = self.wait_for_open_connection_reply2(deadline).await?;
723 if !self.session.transition_to(SessionState::Reply2Sent) {
724 return Err(RaknetClientError::HandshakeProtocolViolation {
725 details: "session transition failed after reply2".to_string(),
726 });
727 }
728
729 if !self.session.transition_to(SessionState::ConnReqRecv) {
730 return Err(RaknetClientError::HandshakeProtocolViolation {
731 details: "session transition failed before connection request".to_string(),
732 });
733 }
734
735 let request_time = unix_timestamp_millis();
736 self.queue_connected_control_packet(
737 ConnectedControlPacket::ConnectionRequest(ConnectionRequest {
738 client_guid: self.config.guid,
739 request_time,
740 use_encryption: false,
741 }),
742 Reliability::ReliableOrdered,
743 0,
744 RakPriority::High,
745 )?;
746 self.flush_outbound_with_limits(
747 self.config.max_new_datagrams_per_recv,
748 self.config.max_new_bytes_per_recv,
749 self.config.max_resend_datagrams_per_recv,
750 self.config.max_resend_bytes_per_recv,
751 )
752 .await?;
753
754 let accepted = self.wait_for_connection_request_accepted(deadline).await?;
755
756 if !self
757 .session
758 .transition_to(SessionState::ConnReqAcceptedSent)
759 {
760 return Err(RaknetClientError::HandshakeProtocolViolation {
761 details: "session transition failed after request accepted".to_string(),
762 });
763 }
764
765 if !self.session.transition_to(SessionState::NewIncomingRecv) {
766 return Err(RaknetClientError::HandshakeProtocolViolation {
767 details: "session transition failed before new incoming".to_string(),
768 });
769 }
770
771 self.queue_connected_control_packet(
772 ConnectedControlPacket::NewIncomingConnection(NewIncomingConnection {
773 server_addr: self.server_addr,
774 internal_addrs: build_internal_addrs(self.server_addr),
775 request_time: accepted.request_time,
776 accepted_time: accepted.accepted_time,
777 }),
778 Reliability::ReliableOrdered,
779 0,
780 RakPriority::High,
781 )?;
782
783 self.flush_outbound_with_limits(
784 self.config.max_new_datagrams_per_recv,
785 self.config.max_new_bytes_per_recv,
786 self.config.max_resend_datagrams_per_recv,
787 self.config.max_resend_bytes_per_recv,
788 )
789 .await?;
790
791 if !self.session.transition_to(SessionState::Connected) {
792 return Err(RaknetClientError::HandshakeProtocolViolation {
793 details: "session transition failed to connected".to_string(),
794 });
795 }
796
797 debug!(server_addr = %self.server_addr, "client handshake completed");
798 self.last_inbound_activity = Instant::now();
799 Ok(())
800 }
801
802 async fn probe_open_connection_reply1(
803 &mut self,
804 overall_deadline: Instant,
805 ) -> ClientResult<OpenConnectionReply1> {
806 let candidates = self.mtu_probe_candidates();
807 for mtu in candidates {
808 for _ in 0..self.config.mtu_probe_attempts_per_step {
809 if Instant::now() >= overall_deadline {
810 warn!(
811 server_addr = %self.server_addr,
812 stage = ?HandshakeStage::OpenConnectionRequest1,
813 "client handshake timed out"
814 );
815 return Err(RaknetClientError::HandshakeTimeout {
816 stage: HandshakeStage::OpenConnectionRequest1,
817 });
818 }
819
820 let req1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
821 protocol_version: self.config.protocol_version,
822 mtu,
823 magic: DEFAULT_UNCONNECTED_MAGIC,
824 });
825 self.send_offline_packet(&req1).await?;
826
827 let per_attempt_deadline = std::cmp::min(
828 overall_deadline,
829 Instant::now() + self.config.mtu_probe_wait_per_attempt,
830 );
831
832 if let Some(reply) = self
833 .wait_for_open_connection_reply1_in_window(per_attempt_deadline)
834 .await?
835 {
836 return Ok(reply);
837 }
838 }
839 }
840
841 warn!(
842 server_addr = %self.server_addr,
843 stage = ?HandshakeStage::OpenConnectionRequest1,
844 "client handshake timed out after mtu probing"
845 );
846 Err(RaknetClientError::HandshakeTimeout {
847 stage: HandshakeStage::OpenConnectionRequest1,
848 })
849 }
850
851 fn mtu_probe_candidates(&self) -> Vec<u16> {
852 let mut out = Vec::new();
853 for candidate in self
854 .config
855 .mtu_probe_order
856 .iter()
857 .copied()
858 .chain([self.config.mtu])
859 {
860 let mtu = candidate
861 .clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE)
862 .max(MINIMUM_MTU_SIZE);
863 if !out.contains(&mtu) {
864 out.push(mtu);
865 }
866 }
867
868 if out.is_empty() {
869 out.push(self.config.mtu);
870 }
871
872 out
873 }
874
875 async fn wait_for_open_connection_reply1_in_window(
876 &mut self,
877 deadline: Instant,
878 ) -> ClientResult<Option<OpenConnectionReply1>> {
879 loop {
880 let packet = match self.recv_packet_until(deadline).await? {
881 Some(packet) => packet,
882 None => return Ok(None),
883 };
884
885 let mut src = &packet[..];
886 let Ok(offline) = OfflinePacket::decode(&mut src) else {
887 continue;
888 };
889
890 if let Some(reason) = offline_rejection_reason(&offline) {
891 return Err(RaknetClientError::OfflineRejected { reason });
892 }
893
894 if let OfflinePacket::OpenConnectionReply1(reply) = offline {
895 return Ok(Some(reply));
896 }
897 }
898 }
899
900 async fn wait_for_open_connection_reply2(
901 &mut self,
902 deadline: Instant,
903 ) -> ClientResult<OpenConnectionReply2> {
904 loop {
905 let packet = match self.recv_packet_until(deadline).await? {
906 Some(packet) => packet,
907 None => {
908 warn!(
909 server_addr = %self.server_addr,
910 stage = ?HandshakeStage::OpenConnectionRequest2,
911 "client handshake timed out waiting for reply2"
912 );
913 return Err(RaknetClientError::HandshakeTimeout {
914 stage: HandshakeStage::OpenConnectionRequest2,
915 });
916 }
917 };
918
919 let mut src = &packet[..];
920 let Ok(offline) = OfflinePacket::decode(&mut src) else {
921 continue;
922 };
923
924 if let Some(reason) = offline_rejection_reason(&offline) {
925 return Err(RaknetClientError::OfflineRejected { reason });
926 }
927
928 if let OfflinePacket::OpenConnectionReply2(reply) = offline {
929 return Ok(reply);
930 }
931 }
932 }
933
934 async fn wait_for_connection_request_accepted(
935 &mut self,
936 deadline: Instant,
937 ) -> ClientResult<ConnectionRequestAccepted> {
938 loop {
939 let packet = match self.recv_packet_until(deadline).await? {
940 Some(packet) => packet,
941 None => {
942 warn!(
943 server_addr = %self.server_addr,
944 stage = ?HandshakeStage::ConnectionRequestAccepted,
945 "client handshake timed out waiting for request accepted"
946 );
947 return Err(RaknetClientError::HandshakeTimeout {
948 stage: HandshakeStage::ConnectionRequestAccepted,
949 });
950 }
951 };
952
953 let mut offline_src = &packet[..];
954 if let Ok(offline) = OfflinePacket::decode(&mut offline_src) {
955 if let Some(reason) = offline_rejection_reason(&offline) {
956 return Err(RaknetClientError::OfflineRejected { reason });
957 }
958 continue;
959 }
960
961 let mut src = &packet[..];
962 let datagram = match Datagram::decode(&mut src) {
963 Ok(datagram) => datagram,
964 Err(_) => continue,
965 };
966
967 if let Some(accepted) = self.process_handshake_datagram(datagram).await? {
968 return Ok(accepted);
969 }
970 }
971 }
972
973 async fn process_handshake_datagram(
974 &mut self,
975 datagram: Datagram,
976 ) -> ClientResult<Option<ConnectionRequestAccepted>> {
977 let now = Instant::now();
978 let frames = self
979 .session
980 .ingest_datagram(datagram, now)
981 .map_err(invalid_data_client_error)?;
982 let _ = self.session.process_incoming_receipts(now);
983
984 let mut accepted = None;
985 for frame in frames {
986 let Some(first) = frame.payload.first().copied() else {
987 continue;
988 };
989 if !is_connected_control_id(first) {
990 continue;
991 }
992
993 let mut control_payload = &frame.payload[..];
994 let control =
995 ConnectedControlPacket::decode(&mut control_payload).map_err(|error| {
996 RaknetClientError::HandshakeProtocolViolation {
997 details: format!("failed to decode handshake control packet: {error}"),
998 }
999 })?;
1000
1001 match control {
1002 ConnectedControlPacket::ConnectionRequestAccepted(pkt) => {
1003 accepted = Some(pkt);
1004 }
1005 ConnectedControlPacket::ConnectedPing(ping) => {
1006 self.queue_connected_control_packet(
1007 ConnectedControlPacket::ConnectedPong(ConnectedPong {
1008 ping_time: ping.ping_time,
1009 pong_time: unix_timestamp_millis(),
1010 }),
1011 Reliability::Unreliable,
1012 0,
1013 RakPriority::Immediate,
1014 )?;
1015 }
1016 ConnectedControlPacket::DisconnectionNotification(pkt) => {
1017 return Err(RaknetClientError::Closed {
1018 reason: ClientDisconnectReason::RemoteDisconnectionNotification {
1019 reason_code: pkt.reason,
1020 },
1021 });
1022 }
1023 ConnectedControlPacket::DetectLostConnection(_) => {
1024 return Err(RaknetClientError::Closed {
1025 reason: ClientDisconnectReason::RemoteDetectLostConnection,
1026 });
1027 }
1028 ConnectedControlPacket::ConnectedPong(_)
1029 | ConnectedControlPacket::ConnectionRequest(_)
1030 | ConnectedControlPacket::NewIncomingConnection(_) => {}
1031 }
1032 }
1033
1034 self.flush_outbound_with_limits(
1035 self.config.max_new_datagrams_per_recv,
1036 self.config.max_new_bytes_per_recv,
1037 self.config.max_resend_datagrams_per_recv,
1038 self.config.max_resend_bytes_per_recv,
1039 )
1040 .await?;
1041
1042 Ok(accepted)
1043 }
1044
1045 async fn recv_packet_until(&mut self, deadline: Instant) -> ClientResult<Option<Vec<u8>>> {
1046 loop {
1047 let remaining = deadline.saturating_duration_since(Instant::now());
1048 if remaining.is_zero() {
1049 return Ok(None);
1050 }
1051
1052 let recv = match time::timeout(remaining, self.socket.recv_from(&mut self.recv_buffer))
1053 .await
1054 {
1055 Ok(result) => result,
1056 Err(_) => return Ok(None),
1057 }
1058 .map_err(RaknetClientError::from)?;
1059
1060 let (len, addr) = recv;
1061 if addr != self.server_addr {
1062 continue;
1063 }
1064
1065 let mut packet = Vec::with_capacity(len);
1066 packet.extend_from_slice(&self.recv_buffer[..len]);
1067 return Ok(Some(packet));
1068 }
1069 }
1070
1071 async fn process_inbound_packet(&mut self, len: usize) -> ClientResult<()> {
1072 let payload = &self.recv_buffer[..len];
1073 let Some(first) = payload.first().copied() else {
1074 return Ok(());
1075 };
1076
1077 if is_offline_packet_id(first) {
1078 self.pending_events
1079 .push_back(RaknetClientEvent::DecodeError {
1080 error: format!("unexpected offline packet id while connected: 0x{first:02x}"),
1081 });
1082 return Ok(());
1083 }
1084
1085 let mut src = payload;
1086 let datagram = match Datagram::decode(&mut src) {
1087 Ok(datagram) => datagram,
1088 Err(error) => {
1089 self.pending_events
1090 .push_back(RaknetClientEvent::DecodeError {
1091 error: error.to_string(),
1092 });
1093 return Ok(());
1094 }
1095 };
1096
1097 self.process_connected_datagram(datagram).await
1098 }
1099
1100 async fn process_connected_datagram(&mut self, datagram: Datagram) -> ClientResult<()> {
1101 let now = Instant::now();
1102 let frames = match self.session.ingest_datagram(datagram, now) {
1103 Ok(frames) => frames,
1104 Err(error) => {
1105 self.pending_events
1106 .push_back(RaknetClientEvent::DecodeError {
1107 error: error.to_string(),
1108 });
1109 return Ok(());
1110 }
1111 };
1112
1113 let receipts = self.session.process_incoming_receipts(now);
1114 for receipt_id in receipts.acked_receipt_ids {
1115 self.pending_events
1116 .push_back(RaknetClientEvent::ReceiptAcked { receipt_id });
1117 }
1118
1119 for frame in frames {
1120 let Some(first) = frame.payload.first().copied() else {
1121 continue;
1122 };
1123
1124 if is_connected_control_id(first) {
1125 let mut control_payload = &frame.payload[..];
1126 let control = match ConnectedControlPacket::decode(&mut control_payload) {
1127 Ok(control) => control,
1128 Err(error) => {
1129 self.pending_events
1130 .push_back(RaknetClientEvent::DecodeError {
1131 error: error.to_string(),
1132 });
1133 continue;
1134 }
1135 };
1136
1137 self.apply_connected_control(control)?;
1138 continue;
1139 }
1140
1141 self.pending_events.push_back(RaknetClientEvent::Packet {
1142 payload: frame.payload,
1143 reliability: frame.header.reliability,
1144 reliable_index: frame.reliable_index,
1145 sequence_index: frame.sequence_index,
1146 ordering_index: frame.ordering_index,
1147 ordering_channel: frame.ordering_channel,
1148 });
1149 }
1150
1151 self.session.force_control_flush_deadlines(now);
1154
1155 self.flush_outbound_with_limits(
1156 self.config.max_new_datagrams_per_recv,
1157 self.config.max_new_bytes_per_recv,
1158 self.config.max_resend_datagrams_per_recv,
1159 self.config.max_resend_bytes_per_recv,
1160 )
1161 .await
1162 }
1163
1164 fn apply_connected_control(&mut self, control: ConnectedControlPacket) -> ClientResult<()> {
1165 match control {
1166 ConnectedControlPacket::ConnectedPing(ping) => {
1167 self.queue_connected_control_packet(
1168 ConnectedControlPacket::ConnectedPong(ConnectedPong {
1169 ping_time: ping.ping_time,
1170 pong_time: unix_timestamp_millis(),
1171 }),
1172 Reliability::Unreliable,
1173 0,
1174 RakPriority::Immediate,
1175 )?;
1176 }
1177 ConnectedControlPacket::DisconnectionNotification(pkt) => {
1178 self.finish_close(ClientDisconnectReason::RemoteDisconnectionNotification {
1179 reason_code: pkt.reason,
1180 });
1181 }
1182 ConnectedControlPacket::DetectLostConnection(DetectLostConnection) => {
1183 self.finish_close(ClientDisconnectReason::RemoteDetectLostConnection);
1184 }
1185 ConnectedControlPacket::ConnectionRequest(_)
1186 | ConnectedControlPacket::ConnectionRequestAccepted(_)
1187 | ConnectedControlPacket::NewIncomingConnection(_)
1188 | ConnectedControlPacket::ConnectedPong(_) => {}
1189 }
1190
1191 Ok(())
1192 }
1193
1194 fn queue_connected_control_packet(
1195 &mut self,
1196 packet: ConnectedControlPacket,
1197 reliability: Reliability,
1198 channel: u8,
1199 priority: RakPriority,
1200 ) -> ClientResult<()> {
1201 let mut out = BytesMut::new();
1202 packet.encode(&mut out).map_err(invalid_data_client_error)?;
1203 self.queue_payload_with_optional_receipt(out.freeze(), reliability, channel, priority, None)
1204 }
1205
1206 fn queue_payload_with_optional_receipt(
1207 &mut self,
1208 payload: Bytes,
1209 reliability: Reliability,
1210 channel: u8,
1211 priority: RakPriority,
1212 receipt_id: Option<u64>,
1213 ) -> ClientResult<()> {
1214 let decision = if let Some(receipt_id) = receipt_id {
1215 self.session.queue_payload_with_receipt(
1216 payload,
1217 reliability,
1218 channel,
1219 priority,
1220 Some(receipt_id),
1221 )
1222 } else {
1223 self.session
1224 .queue_payload(payload, reliability, channel, priority)
1225 };
1226
1227 match decision {
1228 QueuePayloadResult::Enqueued { .. } => Ok(()),
1229 QueuePayloadResult::Dropped => Err(RaknetClientError::BackpressureDropped),
1230 QueuePayloadResult::Deferred => Err(RaknetClientError::BackpressureDeferred),
1231 QueuePayloadResult::DisconnectRequested => {
1232 self.finish_close(ClientDisconnectReason::Backpressure);
1233 Err(RaknetClientError::BackpressureDisconnect)
1234 }
1235 }
1236 }
1237
1238 async fn flush_outbound_with_limits(
1239 &mut self,
1240 max_new_datagrams: usize,
1241 max_new_bytes: usize,
1242 max_resend_datagrams: usize,
1243 max_resend_bytes: usize,
1244 ) -> ClientResult<()> {
1245 if self.closed {
1246 return Ok(());
1247 }
1248
1249 self.queue_keepalive_ping();
1250
1251 let now = Instant::now();
1252 let datagrams = self.session.on_tick(
1253 now,
1254 max_new_datagrams,
1255 max_new_bytes,
1256 max_resend_datagrams,
1257 max_resend_bytes,
1258 );
1259
1260 for datagram in &datagrams {
1261 self.send_datagram(datagram).await?;
1262 }
1263
1264 if self.session.take_backpressure_disconnect() {
1265 self.finish_close(ClientDisconnectReason::Backpressure);
1266 return Err(RaknetClientError::BackpressureDisconnect);
1267 }
1268
1269 Ok(())
1270 }
1271
1272 fn queue_keepalive_ping(&mut self) {
1273 let now = Instant::now();
1274 if !self
1275 .session
1276 .should_send_keepalive(now, self.config.session_keepalive_interval)
1277 {
1278 return;
1279 }
1280
1281 let ping = ConnectedControlPacket::ConnectedPing(ConnectedPing {
1282 ping_time: unix_timestamp_millis(),
1283 });
1284
1285 let decision = {
1286 let mut out = BytesMut::new();
1287 if ping.encode(&mut out).is_err() {
1288 return;
1289 }
1290 self.session
1291 .queue_payload(out.freeze(), Reliability::Unreliable, 0, RakPriority::Low)
1292 };
1293
1294 if matches!(decision, QueuePayloadResult::Enqueued { .. }) {
1295 self.session.mark_keepalive_sent(now);
1296 }
1297 }
1298
1299 fn check_idle_timeout_and_close(&mut self) -> bool {
1300 if self.closed
1301 || self.config.session_idle_timeout.is_zero()
1302 || self.session.state() != SessionState::Connected
1303 {
1304 return false;
1305 }
1306
1307 let idle = Instant::now().saturating_duration_since(self.last_inbound_activity);
1308 if idle >= self.config.session_idle_timeout {
1309 self.finish_close(ClientDisconnectReason::IdleTimeout);
1310 return true;
1311 }
1312
1313 false
1314 }
1315
1316 async fn send_offline_packet(&self, packet: &OfflinePacket) -> ClientResult<()> {
1317 let mut out = BytesMut::new();
1318 packet.encode(&mut out).map_err(invalid_data_client_error)?;
1319 let _written = self
1320 .socket
1321 .send_to(&out, self.server_addr)
1322 .await
1323 .map_err(RaknetClientError::from)?;
1324 Ok(())
1325 }
1326
1327 async fn send_datagram(&self, datagram: &Datagram) -> ClientResult<()> {
1328 let mut out = BytesMut::with_capacity(datagram.encoded_size());
1329 datagram
1330 .encode(&mut out)
1331 .map_err(invalid_data_client_error)?;
1332 let _written = self
1333 .socket
1334 .send_to(&out, self.server_addr)
1335 .await
1336 .map_err(RaknetClientError::from)?;
1337 Ok(())
1338 }
1339
1340 fn ensure_open(&self) -> ClientResult<()> {
1341 if self.closed {
1342 return Err(RaknetClientError::Closed {
1343 reason: self.close_reason.clone().unwrap_or(
1344 ClientDisconnectReason::TransportError {
1345 message: "closed without explicit reason".to_string(),
1346 },
1347 ),
1348 });
1349 }
1350 Ok(())
1351 }
1352
1353 fn finish_close(&mut self, reason: ClientDisconnectReason) {
1354 if self.closed {
1355 return;
1356 }
1357
1358 if self.session.state() == SessionState::Connected {
1359 let _ = self.session.transition_to(SessionState::Closing);
1360 }
1361 let _ = self.session.transition_to(SessionState::Closed);
1362
1363 self.close_reason = Some(reason.clone());
1364 self.closed = true;
1365 match &reason {
1366 ClientDisconnectReason::Requested
1367 | ClientDisconnectReason::RemoteDisconnectionNotification { .. }
1368 | ClientDisconnectReason::RemoteDetectLostConnection => {
1369 info!(server_addr = %self.server_addr, ?reason, "client closed")
1370 }
1371 ClientDisconnectReason::Backpressure
1372 | ClientDisconnectReason::IdleTimeout
1373 | ClientDisconnectReason::TransportError { .. } => {
1374 warn!(server_addr = %self.server_addr, ?reason, "client closed")
1375 }
1376 }
1377 self.pending_events
1378 .push_back(RaknetClientEvent::Disconnected { reason });
1379 }
1380}
1381
1382fn should_retry_connect(error: &RaknetClientError, policy: &ReconnectPolicy) -> bool {
1383 match error {
1384 RaknetClientError::OfflineRejected { .. } => !policy.fast_fail_on_offline_rejection,
1385 RaknetClientError::HandshakeTimeout { .. } => policy.retry_on_handshake_timeout,
1386 RaknetClientError::Io { .. } => policy.retry_on_io,
1387 RaknetClientError::InvalidConfig { .. } => false,
1388 RaknetClientError::HandshakeProtocolViolation { .. }
1389 | RaknetClientError::Closed { .. }
1390 | RaknetClientError::BackpressureDropped
1391 | RaknetClientError::BackpressureDeferred
1392 | RaknetClientError::BackpressureDisconnect => false,
1393 }
1394}
1395
1396fn next_backoff(current: Duration, policy: &ReconnectPolicy) -> Duration {
1397 if current.is_zero() {
1398 return Duration::from_millis(1).min(policy.max_backoff);
1399 }
1400
1401 let doubled = current.saturating_mul(2);
1402 if doubled > policy.max_backoff {
1403 policy.max_backoff
1404 } else {
1405 doubled
1406 }
1407}
1408
1409fn invalid_data_client_error<E: std::fmt::Display>(error: E) -> RaknetClientError {
1410 RaknetClientError::HandshakeProtocolViolation {
1411 details: error.to_string(),
1412 }
1413}
1414
1415fn offline_rejection_reason(packet: &OfflinePacket) -> Option<OfflineRejectionReason> {
1416 match packet {
1417 OfflinePacket::IncompatibleProtocolVersion(pkt) => {
1418 Some(OfflineRejectionReason::IncompatibleProtocolVersion {
1419 protocol_version: pkt.protocol_version,
1420 server_guid: pkt.server_guid,
1421 })
1422 }
1423 OfflinePacket::ConnectionRequestFailed(pkt) => {
1424 Some(OfflineRejectionReason::ConnectionRequestFailed {
1425 server_guid: pkt.server_guid,
1426 })
1427 }
1428 OfflinePacket::AlreadyConnected(pkt) => Some(OfflineRejectionReason::AlreadyConnected {
1429 server_guid: pkt.server_guid,
1430 }),
1431 OfflinePacket::NoFreeIncomingConnections(pkt) => {
1432 Some(OfflineRejectionReason::NoFreeIncomingConnections {
1433 server_guid: pkt.server_guid,
1434 })
1435 }
1436 OfflinePacket::ConnectionBanned(pkt) => Some(OfflineRejectionReason::ConnectionBanned {
1437 server_guid: pkt.server_guid,
1438 }),
1439 OfflinePacket::IpRecentlyConnected(pkt) => {
1440 Some(OfflineRejectionReason::IpRecentlyConnected {
1441 server_guid: pkt.server_guid,
1442 })
1443 }
1444 _ => None,
1445 }
1446}
1447
1448fn is_offline_packet_id(id: u8) -> bool {
1449 matches!(
1450 id,
1451 0x01 | 0x02 | 0x05 | 0x06 | 0x07 | 0x08 | 0x11 | 0x12 | 0x14 | 0x17 | 0x19 | 0x1A | 0x1C
1452 )
1453}
1454
1455fn is_connected_control_id(id: u8) -> bool {
1456 matches!(id, 0x00 | 0x03 | 0x04 | 0x09 | 0x10 | 0x13 | 0x15)
1457}
1458
1459fn default_bind_addr_for_server(server_addr: SocketAddr) -> SocketAddr {
1460 match server_addr {
1461 SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1462 SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1463 Ipv6Addr::UNSPECIFIED,
1464 0,
1465 0,
1466 v6.scope_id(),
1467 )),
1468 }
1469}
1470
1471fn build_internal_addrs(server_addr: SocketAddr) -> [SocketAddr; SYSTEM_ADDRESS_COUNT] {
1472 let fallback = match server_addr {
1473 SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1474 SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1475 Ipv6Addr::UNSPECIFIED,
1476 0,
1477 0,
1478 v6.scope_id(),
1479 )),
1480 };
1481
1482 let mut addrs = [fallback; SYSTEM_ADDRESS_COUNT];
1483 addrs[0] = server_addr;
1484 addrs
1485}
1486
1487fn random_guid() -> u64 {
1488 let mut bytes = [0u8; 8];
1489 if getrandom::getrandom(&mut bytes).is_ok() {
1490 return u64::from_le_bytes(bytes);
1491 }
1492
1493 let now = unix_timestamp_millis() as u64;
1494 now ^ 0xA5A5_5A5A_DEAD_BEEF
1495}
1496
1497fn unix_timestamp_millis() -> i64 {
1498 match SystemTime::now().duration_since(UNIX_EPOCH) {
1499 Ok(duration) => duration.as_millis().min(i64::MAX as u128) as i64,
1500 Err(_) => 0,
1501 }
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506 use super::{
1507 ClientSendOptions, OfflineRejectionReason, RaknetClientConfig, RaknetClientError,
1508 ReconnectPolicy, is_connected_control_id, is_offline_packet_id, next_backoff,
1509 offline_rejection_reason, should_retry_connect,
1510 };
1511 use crate::handshake::{ConnectionBanned, OfflinePacket};
1512 use crate::protocol::reliability::Reliability;
1513 use crate::session::RakPriority;
1514 use std::time::Duration;
1515
1516 #[test]
1517 fn id_guards_cover_known_ranges() {
1518 assert!(is_offline_packet_id(0x05));
1519 assert!(is_offline_packet_id(0x1C));
1520 assert!(!is_offline_packet_id(0xFF));
1521
1522 assert!(is_connected_control_id(0x10));
1523 assert!(!is_connected_control_id(0x11));
1524 }
1525
1526 #[test]
1527 fn send_options_default_matches_server_defaults() {
1528 let options = ClientSendOptions::default();
1529 assert_eq!(options.reliability, Reliability::ReliableOrdered);
1530 assert_eq!(options.channel, 0);
1531 assert_eq!(options.priority, RakPriority::High);
1532 }
1533
1534 #[test]
1535 fn rejection_mapping_extracts_reason() {
1536 let packet = OfflinePacket::ConnectionBanned(ConnectionBanned {
1537 server_guid: 7,
1538 magic: crate::protocol::constants::DEFAULT_UNCONNECTED_MAGIC,
1539 });
1540 assert_eq!(
1541 offline_rejection_reason(&packet),
1542 Some(OfflineRejectionReason::ConnectionBanned { server_guid: 7 })
1543 );
1544 }
1545
1546 #[test]
1547 fn retry_policy_fast_fail_respects_offline_rejection() {
1548 let err = RaknetClientError::OfflineRejected {
1549 reason: OfflineRejectionReason::ConnectionBanned { server_guid: 1 },
1550 };
1551 let policy = ReconnectPolicy::default();
1552 assert!(!should_retry_connect(&err, &policy));
1553
1554 let mut relaxed = policy;
1555 relaxed.fast_fail_on_offline_rejection = false;
1556 assert!(should_retry_connect(&err, &relaxed));
1557 }
1558
1559 #[test]
1560 fn backoff_growth_respects_cap() {
1561 let policy = ReconnectPolicy {
1562 initial_backoff: Duration::from_millis(100),
1563 max_backoff: Duration::from_millis(250),
1564 ..ReconnectPolicy::default()
1565 };
1566 assert_eq!(
1567 next_backoff(Duration::from_millis(100), &policy),
1568 Duration::from_millis(200)
1569 );
1570 assert_eq!(
1571 next_backoff(Duration::from_millis(200), &policy),
1572 Duration::from_millis(250)
1573 );
1574 }
1575
1576 #[test]
1577 fn mtu_probe_candidates_include_configured_mtu_even_when_order_empty() {
1578 let cfg = RaknetClientConfig {
1579 mtu: 1300,
1580 mtu_probe_order: Vec::new(),
1581 ..RaknetClientConfig::default()
1582 };
1583 assert_eq!(cfg.mtu, 1300);
1584 cfg.validate().expect("config should be valid");
1585 }
1586
1587 #[test]
1588 fn client_config_validate_rejects_out_of_range_mtu() {
1589 let cfg = RaknetClientConfig {
1590 mtu: 10,
1591 ..RaknetClientConfig::default()
1592 };
1593 let err = cfg
1594 .validate()
1595 .expect_err("invalid MTU must be rejected by validate()");
1596 assert_eq!(err.config, "RaknetClientConfig");
1597 assert_eq!(err.field, "mtu");
1598 }
1599
1600 #[test]
1601 fn reconnect_policy_validate_rejects_zero_attempts() {
1602 let policy = ReconnectPolicy {
1603 max_attempts: 0,
1604 ..ReconnectPolicy::default()
1605 };
1606 let err = policy
1607 .validate()
1608 .expect_err("max_attempts=0 must be rejected");
1609 assert_eq!(err.config, "ReconnectPolicy");
1610 assert_eq!(err.field, "max_attempts");
1611 }
1612}