1use std::collections::VecDeque;
2use std::io;
3use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
4use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6use bytes::{Bytes, BytesMut};
7use thiserror::Error;
8use tokio::net::UdpSocket;
9use tokio::time::{self, sleep};
10use tracing::{debug, info, warn};
11
12use crate::error::ConfigValidationError;
13use crate::handshake::{
14 OfflinePacket, OpenConnectionReply1, OpenConnectionReply2, OpenConnectionRequest1,
15 OpenConnectionRequest2, Request2ParsePath,
16};
17use crate::protocol::connected::{
18 ConnectedControlPacket, ConnectedPing, ConnectedPong, ConnectionRequest,
19 ConnectionRequestAccepted, DetectLostConnection, DisconnectionNotification,
20 NewIncomingConnection, SYSTEM_ADDRESS_COUNT,
21};
22use crate::protocol::constants::{
23 DEFAULT_UNCONNECTED_MAGIC, MAXIMUM_MTU_SIZE, MINIMUM_MTU_SIZE, MTU_PROBE_ORDER,
24 RAKNET_PROTOCOL_VERSION,
25};
26use crate::protocol::datagram::Datagram;
27use crate::protocol::reliability::Reliability;
28use crate::protocol::sequence24::Sequence24;
29use crate::session::{
30 QueuePayloadResult, RakPriority, Session, SessionMetricsSnapshot, SessionState,
31};
32
33pub type ClientResult<T> = Result<T, RaknetClientError>;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct ClientSendOptions {
37 pub reliability: Reliability,
38 pub channel: u8,
39 pub priority: RakPriority,
40}
41
42impl Default for ClientSendOptions {
43 fn default() -> Self {
44 Self {
45 reliability: Reliability::ReliableOrdered,
46 channel: 0,
47 priority: RakPriority::High,
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
53pub struct RaknetClientConfig {
54 pub local_addr: Option<SocketAddr>,
55 pub guid: u64,
56 pub protocol_version: u8,
57 pub mtu: u16,
58 pub mtu_probe_order: Vec<u16>,
59 pub mtu_probe_attempts_per_step: usize,
60 pub mtu_probe_wait_per_attempt: Duration,
61 pub handshake_timeout: Duration,
62 pub outbound_tick_interval: Duration,
63 pub session_keepalive_interval: Duration,
64 pub session_idle_timeout: Duration,
65 pub recv_buffer_capacity: usize,
66 pub max_new_datagrams_per_tick: usize,
67 pub max_new_bytes_per_tick: usize,
68 pub max_resend_datagrams_per_tick: usize,
69 pub max_resend_bytes_per_tick: usize,
70 pub max_new_datagrams_per_recv: usize,
71 pub max_new_bytes_per_recv: usize,
72 pub max_resend_datagrams_per_recv: usize,
73 pub max_resend_bytes_per_recv: usize,
74}
75
76impl Default for RaknetClientConfig {
77 fn default() -> Self {
78 Self {
79 local_addr: None,
80 guid: random_guid(),
81 protocol_version: RAKNET_PROTOCOL_VERSION,
82 mtu: 1200,
83 mtu_probe_order: MTU_PROBE_ORDER.to_vec(),
84 mtu_probe_attempts_per_step: 2,
85 mtu_probe_wait_per_attempt: Duration::from_millis(350),
86 handshake_timeout: Duration::from_secs(5),
87 outbound_tick_interval: Duration::from_millis(10),
88 session_keepalive_interval: Duration::from_secs(10),
89 session_idle_timeout: Duration::from_secs(30),
90 recv_buffer_capacity: MAXIMUM_MTU_SIZE as usize,
91 max_new_datagrams_per_tick: 8,
92 max_new_bytes_per_tick: 64 * 1024,
93 max_resend_datagrams_per_tick: 8,
94 max_resend_bytes_per_tick: 64 * 1024,
95 max_new_datagrams_per_recv: 6,
96 max_new_bytes_per_recv: 64 * 1024,
97 max_resend_datagrams_per_recv: 6,
98 max_resend_bytes_per_recv: 64 * 1024,
99 }
100 }
101}
102
103impl RaknetClientConfig {
104 pub fn validate(&self) -> Result<(), ConfigValidationError> {
105 if !(MINIMUM_MTU_SIZE..=MAXIMUM_MTU_SIZE).contains(&self.mtu) {
106 return Err(ConfigValidationError::new(
107 "RaknetClientConfig",
108 "mtu",
109 format!(
110 "must be within [{MINIMUM_MTU_SIZE}, {MAXIMUM_MTU_SIZE}], got {}",
111 self.mtu
112 ),
113 ));
114 }
115 if self.mtu_probe_attempts_per_step == 0 {
116 return Err(ConfigValidationError::new(
117 "RaknetClientConfig",
118 "mtu_probe_attempts_per_step",
119 "must be >= 1",
120 ));
121 }
122 if self.mtu_probe_wait_per_attempt.is_zero() {
123 return Err(ConfigValidationError::new(
124 "RaknetClientConfig",
125 "mtu_probe_wait_per_attempt",
126 "must be > 0",
127 ));
128 }
129 if self.handshake_timeout.is_zero() {
130 return Err(ConfigValidationError::new(
131 "RaknetClientConfig",
132 "handshake_timeout",
133 "must be > 0",
134 ));
135 }
136 if self.outbound_tick_interval.is_zero() {
137 return Err(ConfigValidationError::new(
138 "RaknetClientConfig",
139 "outbound_tick_interval",
140 "must be > 0",
141 ));
142 }
143 if self.session_keepalive_interval.is_zero() {
144 return Err(ConfigValidationError::new(
145 "RaknetClientConfig",
146 "session_keepalive_interval",
147 "must be > 0",
148 ));
149 }
150 if self.session_idle_timeout.is_zero() {
151 return Err(ConfigValidationError::new(
152 "RaknetClientConfig",
153 "session_idle_timeout",
154 "must be > 0",
155 ));
156 }
157 if self.recv_buffer_capacity < self.mtu as usize {
158 return Err(ConfigValidationError::new(
159 "RaknetClientConfig",
160 "recv_buffer_capacity",
161 format!(
162 "must be >= mtu ({}), got {}",
163 self.mtu, self.recv_buffer_capacity
164 ),
165 ));
166 }
167 if self.max_new_datagrams_per_tick == 0 {
168 return Err(ConfigValidationError::new(
169 "RaknetClientConfig",
170 "max_new_datagrams_per_tick",
171 "must be >= 1",
172 ));
173 }
174 if self.max_new_bytes_per_tick < self.mtu as usize {
175 return Err(ConfigValidationError::new(
176 "RaknetClientConfig",
177 "max_new_bytes_per_tick",
178 format!(
179 "must be >= mtu ({}), got {}",
180 self.mtu, self.max_new_bytes_per_tick
181 ),
182 ));
183 }
184 if self.max_resend_datagrams_per_tick == 0 {
185 return Err(ConfigValidationError::new(
186 "RaknetClientConfig",
187 "max_resend_datagrams_per_tick",
188 "must be >= 1",
189 ));
190 }
191 if self.max_resend_bytes_per_tick < self.mtu as usize {
192 return Err(ConfigValidationError::new(
193 "RaknetClientConfig",
194 "max_resend_bytes_per_tick",
195 format!(
196 "must be >= mtu ({}), got {}",
197 self.mtu, self.max_resend_bytes_per_tick
198 ),
199 ));
200 }
201 if self.max_new_datagrams_per_recv == 0 {
202 return Err(ConfigValidationError::new(
203 "RaknetClientConfig",
204 "max_new_datagrams_per_recv",
205 "must be >= 1",
206 ));
207 }
208 if self.max_new_bytes_per_recv < self.mtu as usize {
209 return Err(ConfigValidationError::new(
210 "RaknetClientConfig",
211 "max_new_bytes_per_recv",
212 format!(
213 "must be >= mtu ({}), got {}",
214 self.mtu, self.max_new_bytes_per_recv
215 ),
216 ));
217 }
218 if self.max_resend_datagrams_per_recv == 0 {
219 return Err(ConfigValidationError::new(
220 "RaknetClientConfig",
221 "max_resend_datagrams_per_recv",
222 "must be >= 1",
223 ));
224 }
225 if self.max_resend_bytes_per_recv < self.mtu as usize {
226 return Err(ConfigValidationError::new(
227 "RaknetClientConfig",
228 "max_resend_bytes_per_recv",
229 format!(
230 "must be >= mtu ({}), got {}",
231 self.mtu, self.max_resend_bytes_per_recv
232 ),
233 ));
234 }
235
236 Ok(())
237 }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum HandshakeStage {
242 OpenConnectionRequest1,
243 OpenConnectionRequest2,
244 ConnectionRequestAccepted,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum OfflineRejectionReason {
249 IncompatibleProtocolVersion {
250 protocol_version: u8,
251 server_guid: u64,
252 },
253 ConnectionRequestFailed {
254 server_guid: u64,
255 },
256 AlreadyConnected {
257 server_guid: u64,
258 },
259 NoFreeIncomingConnections {
260 server_guid: u64,
261 },
262 ConnectionBanned {
263 server_guid: u64,
264 },
265 IpRecentlyConnected {
266 server_guid: u64,
267 },
268}
269
270#[derive(Debug, Error, Clone, PartialEq, Eq)]
271pub enum RaknetClientError {
272 #[error("transport io error: {message}")]
273 Io { message: String },
274 #[error("invalid client config: {details}")]
275 InvalidConfig { details: String },
276 #[error("handshake timed out at stage {stage:?}")]
277 HandshakeTimeout { stage: HandshakeStage },
278 #[error("handshake rejected by server: {reason:?}")]
279 OfflineRejected { reason: OfflineRejectionReason },
280 #[error("handshake protocol violation: {details}")]
281 HandshakeProtocolViolation { details: String },
282 #[error("client closed: {reason:?}")]
283 Closed { reason: ClientDisconnectReason },
284 #[error("payload dropped by backpressure")]
285 BackpressureDropped,
286 #[error("payload deferred by backpressure")]
287 BackpressureDeferred,
288 #[error("backpressure requested disconnect")]
289 BackpressureDisconnect,
290}
291
292impl From<io::Error> for RaknetClientError {
293 fn from(value: io::Error) -> Self {
294 Self::Io {
295 message: value.to_string(),
296 }
297 }
298}
299
300impl From<RaknetClientError> for io::Error {
301 fn from(value: RaknetClientError) -> Self {
302 io::Error::other(value.to_string())
303 }
304}
305
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub enum ClientDisconnectReason {
308 Requested,
309 Backpressure,
310 IdleTimeout,
311 RemoteDisconnectionNotification { reason_code: Option<u8> },
312 RemoteDetectLostConnection,
313 TransportError { message: String },
314}
315
316#[derive(Debug)]
317pub enum RaknetClientEvent {
318 Connected {
319 server_addr: SocketAddr,
320 mtu: u16,
321 },
322 Packet {
323 payload: Bytes,
324 reliability: Reliability,
325 reliable_index: Option<Sequence24>,
326 sequence_index: Option<Sequence24>,
327 ordering_index: Option<Sequence24>,
328 ordering_channel: Option<u8>,
329 },
330 ReceiptAcked {
331 receipt_id: u64,
332 },
333 DecodeError {
334 error: String,
335 },
336 Disconnected {
337 reason: ClientDisconnectReason,
338 },
339}
340
341#[derive(Debug, Clone)]
342pub struct ReconnectPolicy {
343 pub max_attempts: usize,
344 pub initial_backoff: Duration,
345 pub max_backoff: Duration,
346 pub retry_on_io: bool,
347 pub retry_on_handshake_timeout: bool,
348 pub fast_fail_on_offline_rejection: bool,
349}
350
351impl Default for ReconnectPolicy {
352 fn default() -> Self {
353 Self {
354 max_attempts: 3,
355 initial_backoff: Duration::from_millis(200),
356 max_backoff: Duration::from_secs(3),
357 retry_on_io: true,
358 retry_on_handshake_timeout: true,
359 fast_fail_on_offline_rejection: true,
360 }
361 }
362}
363
364impl ReconnectPolicy {
365 pub fn validate(&self) -> Result<(), ConfigValidationError> {
366 if self.max_attempts == 0 {
367 return Err(ConfigValidationError::new(
368 "ReconnectPolicy",
369 "max_attempts",
370 "must be >= 1",
371 ));
372 }
373 if self.initial_backoff > self.max_backoff {
374 return Err(ConfigValidationError::new(
375 "ReconnectPolicy",
376 "initial_backoff",
377 format!(
378 "must be <= max_backoff ({}ms), got {}ms",
379 self.max_backoff.as_millis(),
380 self.initial_backoff.as_millis()
381 ),
382 ));
383 }
384 Ok(())
385 }
386}
387
388pub struct RaknetClient {
389 socket: UdpSocket,
390 server_addr: SocketAddr,
391 session: Session,
392 config: RaknetClientConfig,
393 recv_buffer: Vec<u8>,
394 pending_events: VecDeque<RaknetClientEvent>,
395 closed: bool,
396 close_reason: Option<ClientDisconnectReason>,
397 last_inbound_activity: Instant,
398}
399
400impl RaknetClient {
401 pub async fn connect(server_addr: SocketAddr) -> ClientResult<Self> {
402 Self::connect_with_config(server_addr, RaknetClientConfig::default()).await
403 }
404
405 pub async fn connect_with_retry(
406 server_addr: SocketAddr,
407 config: RaknetClientConfig,
408 policy: ReconnectPolicy,
409 ) -> ClientResult<Self> {
410 policy
411 .validate()
412 .map_err(|error| RaknetClientError::InvalidConfig {
413 details: error.to_string(),
414 })?;
415
416 let mut attempt = 0usize;
417 let max_attempts = policy.max_attempts.max(1);
418 let mut backoff = policy.initial_backoff;
419 let mut last_error: Option<RaknetClientError> = None;
420
421 while attempt < max_attempts {
422 attempt = attempt.saturating_add(1);
423 match Self::connect_with_config(server_addr, config.clone()).await {
424 Ok(client) => return Ok(client),
425 Err(error) => {
426 let should_retry =
427 attempt < max_attempts && should_retry_connect(&error, &policy);
428 last_error = Some(error);
429 if !should_retry {
430 break;
431 }
432
433 if !backoff.is_zero() {
434 sleep(backoff).await;
435 }
436 backoff = next_backoff(backoff, &policy);
437 }
438 }
439 }
440
441 Err(
442 last_error.unwrap_or(RaknetClientError::HandshakeProtocolViolation {
443 details: "connect_with_retry terminated without a recorded error".to_string(),
444 }),
445 )
446 }
447
448 pub async fn connect_with_config(
449 server_addr: SocketAddr,
450 config: RaknetClientConfig,
451 ) -> ClientResult<Self> {
452 config
453 .validate()
454 .map_err(|error| RaknetClientError::InvalidConfig {
455 details: error.to_string(),
456 })?;
457
458 let local_addr = config
459 .local_addr
460 .unwrap_or_else(|| default_bind_addr_for_server(server_addr));
461 info!(%server_addr, %local_addr, "client connecting");
462 let socket = UdpSocket::bind(local_addr)
463 .await
464 .map_err(RaknetClientError::from)?;
465
466 let now = Instant::now();
467 let mut client = Self {
468 socket,
469 server_addr,
470 session: Session::new(config.mtu as usize),
471 recv_buffer: vec![0u8; config.recv_buffer_capacity],
472 config,
473 pending_events: VecDeque::new(),
474 closed: false,
475 close_reason: None,
476 last_inbound_activity: now,
477 };
478
479 client.perform_handshake().await?;
480 info!(
481 %server_addr,
482 mtu = client.session.mtu(),
483 "client handshake established"
484 );
485 client
486 .pending_events
487 .push_back(RaknetClientEvent::Connected {
488 server_addr,
489 mtu: client.session.mtu() as u16,
490 });
491
492 Ok(client)
493 }
494
495 pub fn local_addr(&self) -> io::Result<SocketAddr> {
496 self.socket.local_addr()
497 }
498
499 pub fn server_addr(&self) -> SocketAddr {
500 self.server_addr
501 }
502
503 pub fn metrics_snapshot(&self) -> SessionMetricsSnapshot {
504 self.session.metrics_snapshot()
505 }
506
507 pub async fn send(&mut self, payload: impl Into<Bytes>) -> ClientResult<()> {
508 self.send_with_options(payload, ClientSendOptions::default())
509 .await
510 }
511
512 pub async fn send_with_options(
513 &mut self,
514 payload: impl Into<Bytes>,
515 options: ClientSendOptions,
516 ) -> ClientResult<()> {
517 self.ensure_open()?;
518 self.queue_payload_with_optional_receipt(
519 payload.into(),
520 options.reliability,
521 options.channel,
522 options.priority,
523 None,
524 )?;
525 self.flush_outbound_with_limits(
526 self.config.max_new_datagrams_per_recv,
527 self.config.max_new_bytes_per_recv,
528 self.config.max_resend_datagrams_per_recv,
529 self.config.max_resend_bytes_per_recv,
530 )
531 .await
532 }
533
534 pub async fn send_with_receipt(
535 &mut self,
536 payload: impl Into<Bytes>,
537 receipt_id: u64,
538 options: ClientSendOptions,
539 ) -> ClientResult<()> {
540 self.ensure_open()?;
541 self.queue_payload_with_optional_receipt(
542 payload.into(),
543 options.reliability,
544 options.channel,
545 options.priority,
546 Some(receipt_id),
547 )?;
548 self.flush_outbound_with_limits(
549 self.config.max_new_datagrams_per_recv,
550 self.config.max_new_bytes_per_recv,
551 self.config.max_resend_datagrams_per_recv,
552 self.config.max_resend_bytes_per_recv,
553 )
554 .await
555 }
556
557 pub async fn disconnect(&mut self, reason_code: Option<u8>) -> ClientResult<()> {
558 if self.closed {
559 return Ok(());
560 }
561
562 let packet = ConnectedControlPacket::DisconnectionNotification(DisconnectionNotification {
563 reason: reason_code,
564 });
565
566 let queued = self.queue_connected_control_packet(
567 packet,
568 Reliability::ReliableOrdered,
569 0,
570 RakPriority::High,
571 );
572
573 if queued.is_ok() {
574 let _ = self
575 .flush_outbound_with_limits(
576 self.config.max_new_datagrams_per_tick,
577 self.config.max_new_bytes_per_tick,
578 self.config.max_resend_datagrams_per_tick,
579 self.config.max_resend_bytes_per_tick,
580 )
581 .await;
582 }
583
584 self.finish_close(ClientDisconnectReason::Requested);
585 queued
586 }
587
588 pub async fn next_event(&mut self) -> Option<RaknetClientEvent> {
589 if let Some(event) = self.pending_events.pop_front() {
590 return Some(event);
591 }
592 if self.closed {
593 return None;
594 }
595
596 loop {
597 if let Some(event) = self.pending_events.pop_front() {
598 return Some(event);
599 }
600 if self.closed {
601 return None;
602 }
603 if self.check_idle_timeout_and_close() {
604 continue;
605 }
606
607 match time::timeout(
608 self.config.outbound_tick_interval,
609 self.socket.recv_from(&mut self.recv_buffer),
610 )
611 .await
612 {
613 Ok(Ok((len, addr))) => {
614 if addr != self.server_addr {
615 continue;
616 }
617 self.last_inbound_activity = Instant::now();
618
619 if let Err(error) = self.process_inbound_packet(len).await {
620 self.finish_close(ClientDisconnectReason::TransportError {
621 message: format!("inbound processing failed: {error}"),
622 });
623 }
624 }
625 Ok(Err(error)) => {
626 self.finish_close(ClientDisconnectReason::TransportError {
627 message: format!("udp receive failed: {error}"),
628 });
629 }
630 Err(_) => {
631 if let Err(error) = self
632 .flush_outbound_with_limits(
633 self.config.max_new_datagrams_per_tick,
634 self.config.max_new_bytes_per_tick,
635 self.config.max_resend_datagrams_per_tick,
636 self.config.max_resend_bytes_per_tick,
637 )
638 .await
639 {
640 self.finish_close(ClientDisconnectReason::TransportError {
641 message: format!("outbound tick failed: {error}"),
642 });
643 }
644 }
645 }
646 }
647 }
648
649 async fn perform_handshake(&mut self) -> ClientResult<()> {
650 debug!(server_addr = %self.server_addr, "starting client handshake");
651 if !self.session.transition_to(SessionState::Req1Recv) {
652 return Err(RaknetClientError::HandshakeProtocolViolation {
653 details: "session transition failed before request1".to_string(),
654 });
655 }
656
657 let deadline = Instant::now() + self.config.handshake_timeout;
658 let reply1 = self.probe_open_connection_reply1(deadline).await?;
659
660 if !self.session.transition_to(SessionState::Reply1Sent) {
661 return Err(RaknetClientError::HandshakeProtocolViolation {
662 details: "session transition failed after reply1".to_string(),
663 });
664 }
665
666 self.session
667 .set_mtu(reply1.mtu.clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE) as usize);
668
669 if !self.session.transition_to(SessionState::Req2Recv) {
670 return Err(RaknetClientError::HandshakeProtocolViolation {
671 details: "session transition failed before request2".to_string(),
672 });
673 }
674
675 let req2 = OfflinePacket::OpenConnectionRequest2(OpenConnectionRequest2 {
676 server_addr: self.server_addr,
677 mtu: reply1.mtu,
678 client_guid: self.config.guid,
679 cookie: reply1.cookie,
680 client_proof: false,
681 parse_path: if reply1.cookie.is_some() {
682 Request2ParsePath::StrictWithCookie
683 } else {
684 Request2ParsePath::StrictNoCookie
685 },
686 magic: DEFAULT_UNCONNECTED_MAGIC,
687 });
688 self.send_offline_packet(&req2).await?;
689
690 let _reply2 = self.wait_for_open_connection_reply2(deadline).await?;
691 if !self.session.transition_to(SessionState::Reply2Sent) {
692 return Err(RaknetClientError::HandshakeProtocolViolation {
693 details: "session transition failed after reply2".to_string(),
694 });
695 }
696
697 if !self.session.transition_to(SessionState::ConnReqRecv) {
698 return Err(RaknetClientError::HandshakeProtocolViolation {
699 details: "session transition failed before connection request".to_string(),
700 });
701 }
702
703 let request_time = unix_timestamp_millis();
704 self.queue_connected_control_packet(
705 ConnectedControlPacket::ConnectionRequest(ConnectionRequest {
706 client_guid: self.config.guid,
707 request_time,
708 use_encryption: false,
709 }),
710 Reliability::ReliableOrdered,
711 0,
712 RakPriority::High,
713 )?;
714 self.flush_outbound_with_limits(
715 self.config.max_new_datagrams_per_recv,
716 self.config.max_new_bytes_per_recv,
717 self.config.max_resend_datagrams_per_recv,
718 self.config.max_resend_bytes_per_recv,
719 )
720 .await?;
721
722 let accepted = self.wait_for_connection_request_accepted(deadline).await?;
723
724 if !self
725 .session
726 .transition_to(SessionState::ConnReqAcceptedSent)
727 {
728 return Err(RaknetClientError::HandshakeProtocolViolation {
729 details: "session transition failed after request accepted".to_string(),
730 });
731 }
732
733 if !self.session.transition_to(SessionState::NewIncomingRecv) {
734 return Err(RaknetClientError::HandshakeProtocolViolation {
735 details: "session transition failed before new incoming".to_string(),
736 });
737 }
738
739 self.queue_connected_control_packet(
740 ConnectedControlPacket::NewIncomingConnection(NewIncomingConnection {
741 server_addr: self.server_addr,
742 internal_addrs: build_internal_addrs(self.server_addr),
743 request_time: accepted.request_time,
744 accepted_time: accepted.accepted_time,
745 }),
746 Reliability::ReliableOrdered,
747 0,
748 RakPriority::High,
749 )?;
750
751 self.flush_outbound_with_limits(
752 self.config.max_new_datagrams_per_recv,
753 self.config.max_new_bytes_per_recv,
754 self.config.max_resend_datagrams_per_recv,
755 self.config.max_resend_bytes_per_recv,
756 )
757 .await?;
758
759 if !self.session.transition_to(SessionState::Connected) {
760 return Err(RaknetClientError::HandshakeProtocolViolation {
761 details: "session transition failed to connected".to_string(),
762 });
763 }
764
765 debug!(server_addr = %self.server_addr, "client handshake completed");
766 self.last_inbound_activity = Instant::now();
767 Ok(())
768 }
769
770 async fn probe_open_connection_reply1(
771 &mut self,
772 overall_deadline: Instant,
773 ) -> ClientResult<OpenConnectionReply1> {
774 let candidates = self.mtu_probe_candidates();
775 for mtu in candidates {
776 for _ in 0..self.config.mtu_probe_attempts_per_step {
777 if Instant::now() >= overall_deadline {
778 warn!(
779 server_addr = %self.server_addr,
780 stage = ?HandshakeStage::OpenConnectionRequest1,
781 "client handshake timed out"
782 );
783 return Err(RaknetClientError::HandshakeTimeout {
784 stage: HandshakeStage::OpenConnectionRequest1,
785 });
786 }
787
788 let req1 = OfflinePacket::OpenConnectionRequest1(OpenConnectionRequest1 {
789 protocol_version: self.config.protocol_version,
790 mtu,
791 magic: DEFAULT_UNCONNECTED_MAGIC,
792 });
793 self.send_offline_packet(&req1).await?;
794
795 let per_attempt_deadline = std::cmp::min(
796 overall_deadline,
797 Instant::now() + self.config.mtu_probe_wait_per_attempt,
798 );
799
800 if let Some(reply) = self
801 .wait_for_open_connection_reply1_in_window(per_attempt_deadline)
802 .await?
803 {
804 return Ok(reply);
805 }
806 }
807 }
808
809 warn!(
810 server_addr = %self.server_addr,
811 stage = ?HandshakeStage::OpenConnectionRequest1,
812 "client handshake timed out after mtu probing"
813 );
814 Err(RaknetClientError::HandshakeTimeout {
815 stage: HandshakeStage::OpenConnectionRequest1,
816 })
817 }
818
819 fn mtu_probe_candidates(&self) -> Vec<u16> {
820 let mut out = Vec::new();
821 for candidate in self
822 .config
823 .mtu_probe_order
824 .iter()
825 .copied()
826 .chain([self.config.mtu])
827 {
828 let mtu = candidate
829 .clamp(MINIMUM_MTU_SIZE, MAXIMUM_MTU_SIZE)
830 .max(MINIMUM_MTU_SIZE);
831 if !out.contains(&mtu) {
832 out.push(mtu);
833 }
834 }
835
836 if out.is_empty() {
837 out.push(self.config.mtu);
838 }
839
840 out
841 }
842
843 async fn wait_for_open_connection_reply1_in_window(
844 &mut self,
845 deadline: Instant,
846 ) -> ClientResult<Option<OpenConnectionReply1>> {
847 loop {
848 let packet = match self.recv_packet_until(deadline).await? {
849 Some(packet) => packet,
850 None => return Ok(None),
851 };
852
853 let mut src = &packet[..];
854 let Ok(offline) = OfflinePacket::decode(&mut src) else {
855 continue;
856 };
857
858 if let Some(reason) = offline_rejection_reason(&offline) {
859 return Err(RaknetClientError::OfflineRejected { reason });
860 }
861
862 if let OfflinePacket::OpenConnectionReply1(reply) = offline {
863 return Ok(Some(reply));
864 }
865 }
866 }
867
868 async fn wait_for_open_connection_reply2(
869 &mut self,
870 deadline: Instant,
871 ) -> ClientResult<OpenConnectionReply2> {
872 loop {
873 let packet = match self.recv_packet_until(deadline).await? {
874 Some(packet) => packet,
875 None => {
876 warn!(
877 server_addr = %self.server_addr,
878 stage = ?HandshakeStage::OpenConnectionRequest2,
879 "client handshake timed out waiting for reply2"
880 );
881 return Err(RaknetClientError::HandshakeTimeout {
882 stage: HandshakeStage::OpenConnectionRequest2,
883 });
884 }
885 };
886
887 let mut src = &packet[..];
888 let Ok(offline) = OfflinePacket::decode(&mut src) else {
889 continue;
890 };
891
892 if let Some(reason) = offline_rejection_reason(&offline) {
893 return Err(RaknetClientError::OfflineRejected { reason });
894 }
895
896 if let OfflinePacket::OpenConnectionReply2(reply) = offline {
897 return Ok(reply);
898 }
899 }
900 }
901
902 async fn wait_for_connection_request_accepted(
903 &mut self,
904 deadline: Instant,
905 ) -> ClientResult<ConnectionRequestAccepted> {
906 loop {
907 let packet = match self.recv_packet_until(deadline).await? {
908 Some(packet) => packet,
909 None => {
910 warn!(
911 server_addr = %self.server_addr,
912 stage = ?HandshakeStage::ConnectionRequestAccepted,
913 "client handshake timed out waiting for request accepted"
914 );
915 return Err(RaknetClientError::HandshakeTimeout {
916 stage: HandshakeStage::ConnectionRequestAccepted,
917 });
918 }
919 };
920
921 let mut offline_src = &packet[..];
922 if let Ok(offline) = OfflinePacket::decode(&mut offline_src) {
923 if let Some(reason) = offline_rejection_reason(&offline) {
924 return Err(RaknetClientError::OfflineRejected { reason });
925 }
926 continue;
927 }
928
929 let mut src = &packet[..];
930 let datagram = match Datagram::decode(&mut src) {
931 Ok(datagram) => datagram,
932 Err(_) => continue,
933 };
934
935 if let Some(accepted) = self.process_handshake_datagram(datagram).await? {
936 return Ok(accepted);
937 }
938 }
939 }
940
941 async fn process_handshake_datagram(
942 &mut self,
943 datagram: Datagram,
944 ) -> ClientResult<Option<ConnectionRequestAccepted>> {
945 let now = Instant::now();
946 let frames = self
947 .session
948 .ingest_datagram(datagram, now)
949 .map_err(invalid_data_client_error)?;
950 let _ = self.session.process_incoming_receipts(now);
951
952 let mut accepted = None;
953 for frame in frames {
954 let Some(first) = frame.payload.first().copied() else {
955 continue;
956 };
957 if !is_connected_control_id(first) {
958 continue;
959 }
960
961 let mut control_payload = &frame.payload[..];
962 let control =
963 ConnectedControlPacket::decode(&mut control_payload).map_err(|error| {
964 RaknetClientError::HandshakeProtocolViolation {
965 details: format!("failed to decode handshake control packet: {error}"),
966 }
967 })?;
968
969 match control {
970 ConnectedControlPacket::ConnectionRequestAccepted(pkt) => {
971 accepted = Some(pkt);
972 }
973 ConnectedControlPacket::ConnectedPing(ping) => {
974 self.queue_connected_control_packet(
975 ConnectedControlPacket::ConnectedPong(ConnectedPong {
976 ping_time: ping.ping_time,
977 pong_time: unix_timestamp_millis(),
978 }),
979 Reliability::Unreliable,
980 0,
981 RakPriority::Immediate,
982 )?;
983 }
984 ConnectedControlPacket::DisconnectionNotification(pkt) => {
985 return Err(RaknetClientError::Closed {
986 reason: ClientDisconnectReason::RemoteDisconnectionNotification {
987 reason_code: pkt.reason,
988 },
989 });
990 }
991 ConnectedControlPacket::DetectLostConnection(_) => {
992 return Err(RaknetClientError::Closed {
993 reason: ClientDisconnectReason::RemoteDetectLostConnection,
994 });
995 }
996 ConnectedControlPacket::ConnectedPong(_)
997 | ConnectedControlPacket::ConnectionRequest(_)
998 | ConnectedControlPacket::NewIncomingConnection(_) => {}
999 }
1000 }
1001
1002 self.flush_outbound_with_limits(
1003 self.config.max_new_datagrams_per_recv,
1004 self.config.max_new_bytes_per_recv,
1005 self.config.max_resend_datagrams_per_recv,
1006 self.config.max_resend_bytes_per_recv,
1007 )
1008 .await?;
1009
1010 Ok(accepted)
1011 }
1012
1013 async fn recv_packet_until(&mut self, deadline: Instant) -> ClientResult<Option<Vec<u8>>> {
1014 loop {
1015 let remaining = deadline.saturating_duration_since(Instant::now());
1016 if remaining.is_zero() {
1017 return Ok(None);
1018 }
1019
1020 let recv = match time::timeout(remaining, self.socket.recv_from(&mut self.recv_buffer))
1021 .await
1022 {
1023 Ok(result) => result,
1024 Err(_) => return Ok(None),
1025 }
1026 .map_err(RaknetClientError::from)?;
1027
1028 let (len, addr) = recv;
1029 if addr != self.server_addr {
1030 continue;
1031 }
1032
1033 let mut packet = Vec::with_capacity(len);
1034 packet.extend_from_slice(&self.recv_buffer[..len]);
1035 return Ok(Some(packet));
1036 }
1037 }
1038
1039 async fn process_inbound_packet(&mut self, len: usize) -> ClientResult<()> {
1040 let payload = &self.recv_buffer[..len];
1041 let Some(first) = payload.first().copied() else {
1042 return Ok(());
1043 };
1044
1045 if is_offline_packet_id(first) {
1046 self.pending_events
1047 .push_back(RaknetClientEvent::DecodeError {
1048 error: format!("unexpected offline packet id while connected: 0x{first:02x}"),
1049 });
1050 return Ok(());
1051 }
1052
1053 let mut src = payload;
1054 let datagram = match Datagram::decode(&mut src) {
1055 Ok(datagram) => datagram,
1056 Err(error) => {
1057 self.pending_events
1058 .push_back(RaknetClientEvent::DecodeError {
1059 error: error.to_string(),
1060 });
1061 return Ok(());
1062 }
1063 };
1064
1065 self.process_connected_datagram(datagram).await
1066 }
1067
1068 async fn process_connected_datagram(&mut self, datagram: Datagram) -> ClientResult<()> {
1069 let now = Instant::now();
1070 let frames = match self.session.ingest_datagram(datagram, now) {
1071 Ok(frames) => frames,
1072 Err(error) => {
1073 self.pending_events
1074 .push_back(RaknetClientEvent::DecodeError {
1075 error: error.to_string(),
1076 });
1077 return Ok(());
1078 }
1079 };
1080
1081 let receipts = self.session.process_incoming_receipts(now);
1082 for receipt_id in receipts.acked_receipt_ids {
1083 self.pending_events
1084 .push_back(RaknetClientEvent::ReceiptAcked { receipt_id });
1085 }
1086
1087 for frame in frames {
1088 let Some(first) = frame.payload.first().copied() else {
1089 continue;
1090 };
1091
1092 if is_connected_control_id(first) {
1093 let mut control_payload = &frame.payload[..];
1094 let control = match ConnectedControlPacket::decode(&mut control_payload) {
1095 Ok(control) => control,
1096 Err(error) => {
1097 self.pending_events
1098 .push_back(RaknetClientEvent::DecodeError {
1099 error: error.to_string(),
1100 });
1101 continue;
1102 }
1103 };
1104
1105 self.apply_connected_control(control)?;
1106 continue;
1107 }
1108
1109 self.pending_events.push_back(RaknetClientEvent::Packet {
1110 payload: frame.payload,
1111 reliability: frame.header.reliability,
1112 reliable_index: frame.reliable_index,
1113 sequence_index: frame.sequence_index,
1114 ordering_index: frame.ordering_index,
1115 ordering_channel: frame.ordering_channel,
1116 });
1117 }
1118
1119 self.session.force_control_flush_deadlines(now);
1122
1123 self.flush_outbound_with_limits(
1124 self.config.max_new_datagrams_per_recv,
1125 self.config.max_new_bytes_per_recv,
1126 self.config.max_resend_datagrams_per_recv,
1127 self.config.max_resend_bytes_per_recv,
1128 )
1129 .await
1130 }
1131
1132 fn apply_connected_control(&mut self, control: ConnectedControlPacket) -> ClientResult<()> {
1133 match control {
1134 ConnectedControlPacket::ConnectedPing(ping) => {
1135 self.queue_connected_control_packet(
1136 ConnectedControlPacket::ConnectedPong(ConnectedPong {
1137 ping_time: ping.ping_time,
1138 pong_time: unix_timestamp_millis(),
1139 }),
1140 Reliability::Unreliable,
1141 0,
1142 RakPriority::Immediate,
1143 )?;
1144 }
1145 ConnectedControlPacket::DisconnectionNotification(pkt) => {
1146 self.finish_close(ClientDisconnectReason::RemoteDisconnectionNotification {
1147 reason_code: pkt.reason,
1148 });
1149 }
1150 ConnectedControlPacket::DetectLostConnection(DetectLostConnection) => {
1151 self.finish_close(ClientDisconnectReason::RemoteDetectLostConnection);
1152 }
1153 ConnectedControlPacket::ConnectionRequest(_)
1154 | ConnectedControlPacket::ConnectionRequestAccepted(_)
1155 | ConnectedControlPacket::NewIncomingConnection(_)
1156 | ConnectedControlPacket::ConnectedPong(_) => {}
1157 }
1158
1159 Ok(())
1160 }
1161
1162 fn queue_connected_control_packet(
1163 &mut self,
1164 packet: ConnectedControlPacket,
1165 reliability: Reliability,
1166 channel: u8,
1167 priority: RakPriority,
1168 ) -> ClientResult<()> {
1169 let mut out = BytesMut::new();
1170 packet.encode(&mut out).map_err(invalid_data_client_error)?;
1171 self.queue_payload_with_optional_receipt(out.freeze(), reliability, channel, priority, None)
1172 }
1173
1174 fn queue_payload_with_optional_receipt(
1175 &mut self,
1176 payload: Bytes,
1177 reliability: Reliability,
1178 channel: u8,
1179 priority: RakPriority,
1180 receipt_id: Option<u64>,
1181 ) -> ClientResult<()> {
1182 let decision = if let Some(receipt_id) = receipt_id {
1183 self.session.queue_payload_with_receipt(
1184 payload,
1185 reliability,
1186 channel,
1187 priority,
1188 Some(receipt_id),
1189 )
1190 } else {
1191 self.session
1192 .queue_payload(payload, reliability, channel, priority)
1193 };
1194
1195 match decision {
1196 QueuePayloadResult::Enqueued { .. } => Ok(()),
1197 QueuePayloadResult::Dropped => Err(RaknetClientError::BackpressureDropped),
1198 QueuePayloadResult::Deferred => Err(RaknetClientError::BackpressureDeferred),
1199 QueuePayloadResult::DisconnectRequested => {
1200 self.finish_close(ClientDisconnectReason::Backpressure);
1201 Err(RaknetClientError::BackpressureDisconnect)
1202 }
1203 }
1204 }
1205
1206 async fn flush_outbound_with_limits(
1207 &mut self,
1208 max_new_datagrams: usize,
1209 max_new_bytes: usize,
1210 max_resend_datagrams: usize,
1211 max_resend_bytes: usize,
1212 ) -> ClientResult<()> {
1213 if self.closed {
1214 return Ok(());
1215 }
1216
1217 self.queue_keepalive_ping();
1218
1219 let now = Instant::now();
1220 let datagrams = self.session.on_tick(
1221 now,
1222 max_new_datagrams,
1223 max_new_bytes,
1224 max_resend_datagrams,
1225 max_resend_bytes,
1226 );
1227
1228 for datagram in &datagrams {
1229 self.send_datagram(datagram).await?;
1230 }
1231
1232 if self.session.take_backpressure_disconnect() {
1233 self.finish_close(ClientDisconnectReason::Backpressure);
1234 return Err(RaknetClientError::BackpressureDisconnect);
1235 }
1236
1237 Ok(())
1238 }
1239
1240 fn queue_keepalive_ping(&mut self) {
1241 let now = Instant::now();
1242 if !self
1243 .session
1244 .should_send_keepalive(now, self.config.session_keepalive_interval)
1245 {
1246 return;
1247 }
1248
1249 let ping = ConnectedControlPacket::ConnectedPing(ConnectedPing {
1250 ping_time: unix_timestamp_millis(),
1251 });
1252
1253 let decision = {
1254 let mut out = BytesMut::new();
1255 if ping.encode(&mut out).is_err() {
1256 return;
1257 }
1258 self.session
1259 .queue_payload(out.freeze(), Reliability::Unreliable, 0, RakPriority::Low)
1260 };
1261
1262 if matches!(decision, QueuePayloadResult::Enqueued { .. }) {
1263 self.session.mark_keepalive_sent(now);
1264 }
1265 }
1266
1267 fn check_idle_timeout_and_close(&mut self) -> bool {
1268 if self.closed
1269 || self.config.session_idle_timeout.is_zero()
1270 || self.session.state() != SessionState::Connected
1271 {
1272 return false;
1273 }
1274
1275 let idle = Instant::now().saturating_duration_since(self.last_inbound_activity);
1276 if idle >= self.config.session_idle_timeout {
1277 self.finish_close(ClientDisconnectReason::IdleTimeout);
1278 return true;
1279 }
1280
1281 false
1282 }
1283
1284 async fn send_offline_packet(&self, packet: &OfflinePacket) -> ClientResult<()> {
1285 let mut out = BytesMut::new();
1286 packet.encode(&mut out).map_err(invalid_data_client_error)?;
1287 let _written = self
1288 .socket
1289 .send_to(&out, self.server_addr)
1290 .await
1291 .map_err(RaknetClientError::from)?;
1292 Ok(())
1293 }
1294
1295 async fn send_datagram(&self, datagram: &Datagram) -> ClientResult<()> {
1296 let mut out = BytesMut::with_capacity(datagram.encoded_size());
1297 datagram
1298 .encode(&mut out)
1299 .map_err(invalid_data_client_error)?;
1300 let _written = self
1301 .socket
1302 .send_to(&out, self.server_addr)
1303 .await
1304 .map_err(RaknetClientError::from)?;
1305 Ok(())
1306 }
1307
1308 fn ensure_open(&self) -> ClientResult<()> {
1309 if self.closed {
1310 return Err(RaknetClientError::Closed {
1311 reason: self.close_reason.clone().unwrap_or(
1312 ClientDisconnectReason::TransportError {
1313 message: "closed without explicit reason".to_string(),
1314 },
1315 ),
1316 });
1317 }
1318 Ok(())
1319 }
1320
1321 fn finish_close(&mut self, reason: ClientDisconnectReason) {
1322 if self.closed {
1323 return;
1324 }
1325
1326 if self.session.state() == SessionState::Connected {
1327 let _ = self.session.transition_to(SessionState::Closing);
1328 }
1329 let _ = self.session.transition_to(SessionState::Closed);
1330
1331 self.close_reason = Some(reason.clone());
1332 self.closed = true;
1333 match &reason {
1334 ClientDisconnectReason::Requested
1335 | ClientDisconnectReason::RemoteDisconnectionNotification { .. }
1336 | ClientDisconnectReason::RemoteDetectLostConnection => {
1337 info!(server_addr = %self.server_addr, ?reason, "client closed")
1338 }
1339 ClientDisconnectReason::Backpressure
1340 | ClientDisconnectReason::IdleTimeout
1341 | ClientDisconnectReason::TransportError { .. } => {
1342 warn!(server_addr = %self.server_addr, ?reason, "client closed")
1343 }
1344 }
1345 self.pending_events
1346 .push_back(RaknetClientEvent::Disconnected { reason });
1347 }
1348}
1349
1350fn should_retry_connect(error: &RaknetClientError, policy: &ReconnectPolicy) -> bool {
1351 match error {
1352 RaknetClientError::OfflineRejected { .. } => !policy.fast_fail_on_offline_rejection,
1353 RaknetClientError::HandshakeTimeout { .. } => policy.retry_on_handshake_timeout,
1354 RaknetClientError::Io { .. } => policy.retry_on_io,
1355 RaknetClientError::InvalidConfig { .. } => false,
1356 RaknetClientError::HandshakeProtocolViolation { .. }
1357 | RaknetClientError::Closed { .. }
1358 | RaknetClientError::BackpressureDropped
1359 | RaknetClientError::BackpressureDeferred
1360 | RaknetClientError::BackpressureDisconnect => false,
1361 }
1362}
1363
1364fn next_backoff(current: Duration, policy: &ReconnectPolicy) -> Duration {
1365 if current.is_zero() {
1366 return Duration::from_millis(1).min(policy.max_backoff);
1367 }
1368
1369 let doubled = current.saturating_mul(2);
1370 if doubled > policy.max_backoff {
1371 policy.max_backoff
1372 } else {
1373 doubled
1374 }
1375}
1376
1377fn invalid_data_client_error<E: std::fmt::Display>(error: E) -> RaknetClientError {
1378 RaknetClientError::HandshakeProtocolViolation {
1379 details: error.to_string(),
1380 }
1381}
1382
1383fn offline_rejection_reason(packet: &OfflinePacket) -> Option<OfflineRejectionReason> {
1384 match packet {
1385 OfflinePacket::IncompatibleProtocolVersion(pkt) => {
1386 Some(OfflineRejectionReason::IncompatibleProtocolVersion {
1387 protocol_version: pkt.protocol_version,
1388 server_guid: pkt.server_guid,
1389 })
1390 }
1391 OfflinePacket::ConnectionRequestFailed(pkt) => {
1392 Some(OfflineRejectionReason::ConnectionRequestFailed {
1393 server_guid: pkt.server_guid,
1394 })
1395 }
1396 OfflinePacket::AlreadyConnected(pkt) => Some(OfflineRejectionReason::AlreadyConnected {
1397 server_guid: pkt.server_guid,
1398 }),
1399 OfflinePacket::NoFreeIncomingConnections(pkt) => {
1400 Some(OfflineRejectionReason::NoFreeIncomingConnections {
1401 server_guid: pkt.server_guid,
1402 })
1403 }
1404 OfflinePacket::ConnectionBanned(pkt) => Some(OfflineRejectionReason::ConnectionBanned {
1405 server_guid: pkt.server_guid,
1406 }),
1407 OfflinePacket::IpRecentlyConnected(pkt) => {
1408 Some(OfflineRejectionReason::IpRecentlyConnected {
1409 server_guid: pkt.server_guid,
1410 })
1411 }
1412 _ => None,
1413 }
1414}
1415
1416fn is_offline_packet_id(id: u8) -> bool {
1417 matches!(
1418 id,
1419 0x01 | 0x02 | 0x05 | 0x06 | 0x07 | 0x08 | 0x11 | 0x12 | 0x14 | 0x17 | 0x19 | 0x1A | 0x1C
1420 )
1421}
1422
1423fn is_connected_control_id(id: u8) -> bool {
1424 matches!(id, 0x00 | 0x03 | 0x04 | 0x09 | 0x10 | 0x13 | 0x15)
1425}
1426
1427fn default_bind_addr_for_server(server_addr: SocketAddr) -> SocketAddr {
1428 match server_addr {
1429 SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1430 SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1431 Ipv6Addr::UNSPECIFIED,
1432 0,
1433 0,
1434 v6.scope_id(),
1435 )),
1436 }
1437}
1438
1439fn build_internal_addrs(server_addr: SocketAddr) -> [SocketAddr; SYSTEM_ADDRESS_COUNT] {
1440 let fallback = match server_addr {
1441 SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
1442 SocketAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(
1443 Ipv6Addr::UNSPECIFIED,
1444 0,
1445 0,
1446 v6.scope_id(),
1447 )),
1448 };
1449
1450 let mut addrs = [fallback; SYSTEM_ADDRESS_COUNT];
1451 addrs[0] = server_addr;
1452 addrs
1453}
1454
1455fn random_guid() -> u64 {
1456 let mut bytes = [0u8; 8];
1457 if getrandom::getrandom(&mut bytes).is_ok() {
1458 return u64::from_le_bytes(bytes);
1459 }
1460
1461 let now = unix_timestamp_millis() as u64;
1462 now ^ 0xA5A5_5A5A_DEAD_BEEF
1463}
1464
1465fn unix_timestamp_millis() -> i64 {
1466 match SystemTime::now().duration_since(UNIX_EPOCH) {
1467 Ok(duration) => duration.as_millis().min(i64::MAX as u128) as i64,
1468 Err(_) => 0,
1469 }
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474 use super::{
1475 ClientSendOptions, OfflineRejectionReason, RaknetClientConfig, RaknetClientError,
1476 ReconnectPolicy, is_connected_control_id, is_offline_packet_id, next_backoff,
1477 offline_rejection_reason, should_retry_connect,
1478 };
1479 use crate::handshake::{ConnectionBanned, OfflinePacket};
1480 use crate::protocol::reliability::Reliability;
1481 use crate::session::RakPriority;
1482 use std::time::Duration;
1483
1484 #[test]
1485 fn id_guards_cover_known_ranges() {
1486 assert!(is_offline_packet_id(0x05));
1487 assert!(is_offline_packet_id(0x1C));
1488 assert!(!is_offline_packet_id(0xFF));
1489
1490 assert!(is_connected_control_id(0x10));
1491 assert!(!is_connected_control_id(0x11));
1492 }
1493
1494 #[test]
1495 fn send_options_default_matches_server_defaults() {
1496 let options = ClientSendOptions::default();
1497 assert_eq!(options.reliability, Reliability::ReliableOrdered);
1498 assert_eq!(options.channel, 0);
1499 assert_eq!(options.priority, RakPriority::High);
1500 }
1501
1502 #[test]
1503 fn rejection_mapping_extracts_reason() {
1504 let packet = OfflinePacket::ConnectionBanned(ConnectionBanned {
1505 server_guid: 7,
1506 magic: crate::protocol::constants::DEFAULT_UNCONNECTED_MAGIC,
1507 });
1508 assert_eq!(
1509 offline_rejection_reason(&packet),
1510 Some(OfflineRejectionReason::ConnectionBanned { server_guid: 7 })
1511 );
1512 }
1513
1514 #[test]
1515 fn retry_policy_fast_fail_respects_offline_rejection() {
1516 let err = RaknetClientError::OfflineRejected {
1517 reason: OfflineRejectionReason::ConnectionBanned { server_guid: 1 },
1518 };
1519 let policy = ReconnectPolicy::default();
1520 assert!(!should_retry_connect(&err, &policy));
1521
1522 let mut relaxed = policy;
1523 relaxed.fast_fail_on_offline_rejection = false;
1524 assert!(should_retry_connect(&err, &relaxed));
1525 }
1526
1527 #[test]
1528 fn backoff_growth_respects_cap() {
1529 let policy = ReconnectPolicy {
1530 initial_backoff: Duration::from_millis(100),
1531 max_backoff: Duration::from_millis(250),
1532 ..ReconnectPolicy::default()
1533 };
1534 assert_eq!(
1535 next_backoff(Duration::from_millis(100), &policy),
1536 Duration::from_millis(200)
1537 );
1538 assert_eq!(
1539 next_backoff(Duration::from_millis(200), &policy),
1540 Duration::from_millis(250)
1541 );
1542 }
1543
1544 #[test]
1545 fn mtu_probe_candidates_include_configured_mtu_even_when_order_empty() {
1546 let cfg = RaknetClientConfig {
1547 mtu: 1300,
1548 mtu_probe_order: Vec::new(),
1549 ..RaknetClientConfig::default()
1550 };
1551 assert_eq!(cfg.mtu, 1300);
1552 cfg.validate().expect("config should be valid");
1553 }
1554
1555 #[test]
1556 fn client_config_validate_rejects_out_of_range_mtu() {
1557 let cfg = RaknetClientConfig {
1558 mtu: 10,
1559 ..RaknetClientConfig::default()
1560 };
1561 let err = cfg
1562 .validate()
1563 .expect_err("invalid MTU must be rejected by validate()");
1564 assert_eq!(err.config, "RaknetClientConfig");
1565 assert_eq!(err.field, "mtu");
1566 }
1567
1568 #[test]
1569 fn reconnect_policy_validate_rejects_zero_attempts() {
1570 let policy = ReconnectPolicy {
1571 max_attempts: 0,
1572 ..ReconnectPolicy::default()
1573 };
1574 let err = policy
1575 .validate()
1576 .expect_err("max_attempts=0 must be rejected");
1577 assert_eq!(err.config, "ReconnectPolicy");
1578 assert_eq!(err.field, "max_attempts");
1579 }
1580}