1use std::collections::VecDeque;
2use std::future::Future;
3use std::io;
4use std::net::{IpAddr, SocketAddr};
5use std::pin::Pin;
6
7use bytes::Bytes;
8use tracing::{debug, info, warn};
9
10use crate::concurrency::{FastMap, fast_map};
11use crate::error::ConfigValidationError;
12use crate::handshake::OfflinePacket;
13use crate::protocol::reliability::Reliability;
14use crate::protocol::sequence24::Sequence24;
15use crate::session::RakPriority;
16use crate::transport::{
17 RemoteDisconnectReason, ShardedRuntimeConfig, ShardedRuntimeEvent, ShardedRuntimeHandle,
18 ShardedSendPayload, TransportConfig, TransportEvent, TransportMetricsSnapshot,
19 spawn_sharded_runtime,
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub struct PeerId(u64);
24
25impl PeerId {
26 pub const fn from_u64(value: u64) -> Self {
27 Self(value)
28 }
29
30 pub const fn as_u64(self) -> u64 {
31 self.0
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct SendOptions {
37 pub reliability: Reliability,
38 pub channel: u8,
39 pub priority: RakPriority,
40}
41
42impl Default for SendOptions {
43 fn default() -> Self {
44 Self {
45 reliability: Reliability::ReliableOrdered,
46 channel: 0,
47 priority: RakPriority::High,
48 }
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum PeerDisconnectReason {
54 Requested,
55 RemoteDisconnectionNotification { reason_code: Option<u8> },
56 RemoteDetectLostConnection,
57 WorkerStopped { shard_id: usize },
58}
59
60#[derive(Debug)]
61pub enum RaknetServerEvent {
62 PeerConnected {
63 peer_id: PeerId,
64 addr: SocketAddr,
65 client_guid: u64,
66 shard_id: usize,
67 },
68 PeerDisconnected {
69 peer_id: PeerId,
70 addr: SocketAddr,
71 reason: PeerDisconnectReason,
72 },
73 Packet {
74 peer_id: PeerId,
75 addr: SocketAddr,
76 payload: Bytes,
77 reliability: Reliability,
78 reliable_index: Option<Sequence24>,
79 sequence_index: Option<Sequence24>,
80 ordering_index: Option<Sequence24>,
81 ordering_channel: Option<u8>,
82 },
83 OfflinePacket {
84 addr: SocketAddr,
85 packet: OfflinePacket,
86 },
87 ReceiptAcked {
88 peer_id: PeerId,
89 addr: SocketAddr,
90 receipt_id: u64,
91 },
92 PeerRateLimited {
93 addr: SocketAddr,
94 },
95 SessionLimitReached {
96 addr: SocketAddr,
97 },
98 ProxyDropped {
99 addr: SocketAddr,
100 },
101 DecodeError {
102 addr: SocketAddr,
103 error: String,
104 },
105 WorkerError {
106 shard_id: usize,
107 message: String,
108 },
109 WorkerStopped {
110 shard_id: usize,
111 },
112 Metrics {
113 shard_id: usize,
114 snapshot: Box<TransportMetricsSnapshot>,
115 dropped_non_critical_events: u64,
116 },
117}
118
119impl RaknetServerEvent {
120 pub fn metrics_snapshot(&self) -> Option<(usize, &TransportMetricsSnapshot, u64)> {
121 match self {
122 Self::Metrics {
123 shard_id,
124 snapshot,
125 dropped_non_critical_events,
126 } => Some((*shard_id, snapshot.as_ref(), *dropped_non_critical_events)),
127 _ => None,
128 }
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub struct ConnectEvent {
134 pub peer_id: PeerId,
135 pub addr: SocketAddr,
136 pub client_guid: u64,
137 pub shard_id: usize,
138}
139
140#[derive(Debug, Clone)]
141pub struct PacketEvent {
142 pub peer_id: PeerId,
143 pub addr: SocketAddr,
144 pub payload: Bytes,
145 pub reliability: Reliability,
146 pub reliable_index: Option<Sequence24>,
147 pub sequence_index: Option<Sequence24>,
148 pub ordering_index: Option<Sequence24>,
149 pub ordering_channel: Option<u8>,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub struct DisconnectEvent {
154 pub peer_id: PeerId,
155 pub addr: SocketAddr,
156 pub reason: PeerDisconnectReason,
157}
158
159pub type ServerHookFuture<'a> = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
160
161type ConnectHandler =
162 Box<dyn for<'a> FnMut(&'a mut RaknetServer, ConnectEvent) -> ServerHookFuture<'a> + Send>;
163type PacketHandler =
164 Box<dyn for<'a> FnMut(&'a mut RaknetServer, PacketEvent) -> ServerHookFuture<'a> + Send>;
165type DisconnectHandler =
166 Box<dyn for<'a> FnMut(&'a mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'a> + Send>;
167
168pub struct ServerFacade<'a> {
169 server: &'a mut RaknetServer,
170 on_connect: Option<ConnectHandler>,
171 on_packet: Option<PacketHandler>,
172 on_disconnect: Option<DisconnectHandler>,
173}
174
175impl<'a> ServerFacade<'a> {
176 pub fn new(server: &'a mut RaknetServer) -> Self {
177 Self {
178 server,
179 on_connect: None,
180 on_packet: None,
181 on_disconnect: None,
182 }
183 }
184
185 pub fn on_connect<F>(mut self, handler: F) -> Self
186 where
187 F: for<'b> FnMut(&'b mut RaknetServer, ConnectEvent) -> ServerHookFuture<'b>
188 + Send
189 + 'static,
190 {
191 self.on_connect = Some(Box::new(handler));
192 self
193 }
194
195 pub fn on_packet<F>(mut self, handler: F) -> Self
196 where
197 F: for<'b> FnMut(&'b mut RaknetServer, PacketEvent) -> ServerHookFuture<'b>
198 + Send
199 + 'static,
200 {
201 self.on_packet = Some(Box::new(handler));
202 self
203 }
204
205 pub fn on_disconnect<F>(mut self, handler: F) -> Self
206 where
207 F: for<'b> FnMut(&'b mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'b>
208 + Send
209 + 'static,
210 {
211 self.on_disconnect = Some(Box::new(handler));
212 self
213 }
214
215 pub async fn next(&mut self) -> io::Result<bool> {
216 let Some(event) = self.server.next_event().await else {
217 return Ok(false);
218 };
219 self.dispatch(event).await?;
220 Ok(true)
221 }
222
223 pub async fn run(&mut self) -> io::Result<()> {
224 while self.next().await? {}
225 Ok(())
226 }
227
228 pub fn server(&self) -> &RaknetServer {
229 self.server
230 }
231
232 pub fn server_mut(&mut self) -> &mut RaknetServer {
233 self.server
234 }
235
236 async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
237 match event {
238 RaknetServerEvent::PeerConnected {
239 peer_id,
240 addr,
241 client_guid,
242 shard_id,
243 } => {
244 if let Some(handler) = self.on_connect.as_mut() {
245 handler(
246 self.server,
247 ConnectEvent {
248 peer_id,
249 addr,
250 client_guid,
251 shard_id,
252 },
253 )
254 .await?;
255 }
256 }
257 RaknetServerEvent::Packet {
258 peer_id,
259 addr,
260 payload,
261 reliability,
262 reliable_index,
263 sequence_index,
264 ordering_index,
265 ordering_channel,
266 } => {
267 if let Some(handler) = self.on_packet.as_mut() {
268 handler(
269 self.server,
270 PacketEvent {
271 peer_id,
272 addr,
273 payload,
274 reliability,
275 reliable_index,
276 sequence_index,
277 ordering_index,
278 ordering_channel,
279 },
280 )
281 .await?;
282 }
283 }
284 RaknetServerEvent::PeerDisconnected {
285 peer_id,
286 addr,
287 reason,
288 } => {
289 if let Some(handler) = self.on_disconnect.as_mut() {
290 handler(
291 self.server,
292 DisconnectEvent {
293 peer_id,
294 addr,
295 reason,
296 },
297 )
298 .await?;
299 }
300 }
301 RaknetServerEvent::OfflinePacket { .. }
302 | RaknetServerEvent::ReceiptAcked { .. }
303 | RaknetServerEvent::PeerRateLimited { .. }
304 | RaknetServerEvent::SessionLimitReached { .. }
305 | RaknetServerEvent::ProxyDropped { .. }
306 | RaknetServerEvent::DecodeError { .. }
307 | RaknetServerEvent::WorkerError { .. }
308 | RaknetServerEvent::WorkerStopped { .. }
309 | RaknetServerEvent::Metrics { .. } => {}
310 }
311
312 Ok(())
313 }
314}
315
316pub trait EventFacadeHandler {
321 fn on_connect<'a>(
322 &'a mut self,
323 _session_id: u64,
324 _addr: IpAddr,
325 _port: u16,
326 _client_guid: u64,
327 ) -> ServerHookFuture<'a> {
328 Box::pin(async { Ok(()) })
329 }
330
331 fn on_disconnect<'a>(
332 &'a mut self,
333 _session_id: u64,
334 _reason: PeerDisconnectReason,
335 ) -> ServerHookFuture<'a> {
336 Box::pin(async { Ok(()) })
337 }
338
339 fn on_packet<'a>(&'a mut self, _session_id: u64, _payload: Bytes) -> ServerHookFuture<'a> {
340 Box::pin(async { Ok(()) })
341 }
342
343 fn on_ack<'a>(&'a mut self, _session_id: u64, _receipt_id: u64) -> ServerHookFuture<'a> {
344 Box::pin(async { Ok(()) })
345 }
346
347 fn on_metrics<'a>(
348 &'a mut self,
349 _shard_id: usize,
350 _snapshot: TransportMetricsSnapshot,
351 _dropped_non_critical_events: u64,
352 ) -> ServerHookFuture<'a> {
353 Box::pin(async { Ok(()) })
354 }
355}
356
357pub async fn dispatch_event_facade<H: EventFacadeHandler>(
358 handler: &mut H,
359 event: RaknetServerEvent,
360) -> io::Result<()> {
361 match event {
362 RaknetServerEvent::PeerConnected {
363 peer_id,
364 addr,
365 client_guid,
366 ..
367 } => {
368 handler
369 .on_connect(peer_id.as_u64(), addr.ip(), addr.port(), client_guid)
370 .await?;
371 }
372 RaknetServerEvent::PeerDisconnected {
373 peer_id, reason, ..
374 } => {
375 handler.on_disconnect(peer_id.as_u64(), reason).await?;
376 }
377 RaknetServerEvent::Packet {
378 peer_id, payload, ..
379 } => {
380 handler.on_packet(peer_id.as_u64(), payload).await?;
381 }
382 RaknetServerEvent::ReceiptAcked {
383 peer_id,
384 receipt_id,
385 ..
386 } => {
387 handler.on_ack(peer_id.as_u64(), receipt_id).await?;
388 }
389 RaknetServerEvent::Metrics {
390 shard_id,
391 snapshot,
392 dropped_non_critical_events,
393 } => {
394 handler
395 .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
396 .await?;
397 }
398 RaknetServerEvent::OfflinePacket { .. }
399 | RaknetServerEvent::PeerRateLimited { .. }
400 | RaknetServerEvent::SessionLimitReached { .. }
401 | RaknetServerEvent::ProxyDropped { .. }
402 | RaknetServerEvent::DecodeError { .. }
403 | RaknetServerEvent::WorkerError { .. }
404 | RaknetServerEvent::WorkerStopped { .. } => {}
405 }
406
407 Ok(())
408}
409
410pub struct EventFacade<'a, H: EventFacadeHandler> {
411 server: &'a mut RaknetServer,
412 handler: &'a mut H,
413}
414
415impl<'a, H: EventFacadeHandler> EventFacade<'a, H> {
416 pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
417 Self { server, handler }
418 }
419
420 pub async fn next(&mut self) -> io::Result<bool> {
421 let Some(event) = self.server.next_event().await else {
422 return Ok(false);
423 };
424 self.dispatch(event).await?;
425 Ok(true)
426 }
427
428 pub async fn run(&mut self) -> io::Result<()> {
429 while self.next().await? {}
430 Ok(())
431 }
432
433 pub fn server(&self) -> &RaknetServer {
434 self.server
435 }
436
437 pub fn server_mut(&mut self) -> &mut RaknetServer {
438 self.server
439 }
440
441 pub fn handler(&self) -> &H {
442 self.handler
443 }
444
445 pub fn handler_mut(&mut self) -> &mut H {
446 self.handler
447 }
448
449 async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
450 dispatch_event_facade(self.handler, event).await
451 }
452}
453
454pub type SessionId = u32;
455
456#[derive(Debug)]
457pub struct SessionIdAdapter {
458 peer_to_session: FastMap<PeerId, SessionId>,
459 session_to_peer: FastMap<SessionId, PeerId>,
460 next_session_id: SessionId,
461}
462
463impl Default for SessionIdAdapter {
464 fn default() -> Self {
465 Self::new()
466 }
467}
468
469impl SessionIdAdapter {
470 pub fn new() -> Self {
471 Self {
472 peer_to_session: fast_map(),
473 session_to_peer: fast_map(),
474 next_session_id: 1,
475 }
476 }
477
478 pub fn len(&self) -> usize {
479 self.peer_to_session.len()
480 }
481
482 pub fn is_empty(&self) -> bool {
483 self.peer_to_session.is_empty()
484 }
485
486 pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
487 self.peer_to_session.get(&peer_id).map(|entry| *entry)
488 }
489
490 pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
491 self.session_to_peer.get(&session_id).map(|entry| *entry)
492 }
493
494 pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
495 let session_id = Self::session_id_from_i32(session_id)?;
496 self.peer_id_for_session(session_id)
497 }
498
499 pub fn register_peer(&mut self, peer_id: PeerId) -> io::Result<SessionId> {
500 if let Some(existing) = self.session_id_for_peer(peer_id) {
501 return Ok(existing);
502 }
503
504 let session_id = self.allocate_session_id()?;
505 self.peer_to_session.insert(peer_id, session_id);
506 self.session_to_peer.insert(session_id, peer_id);
507 Ok(session_id)
508 }
509
510 pub fn unregister_peer(&mut self, peer_id: PeerId) -> Option<SessionId> {
511 let (_, session_id) = self.peer_to_session.remove(&peer_id)?;
512 self.session_to_peer.remove(&session_id);
513 Some(session_id)
514 }
515
516 pub fn clear(&mut self) {
517 self.peer_to_session.clear();
518 self.session_to_peer.clear();
519 self.next_session_id = 1;
520 }
521
522 pub fn session_id_to_i32(session_id: SessionId) -> Option<i32> {
523 i32::try_from(session_id).ok()
524 }
525
526 pub fn session_id_from_i32(session_id: i32) -> Option<SessionId> {
527 u32::try_from(session_id).ok()
528 }
529
530 fn allocate_session_id(&mut self) -> io::Result<SessionId> {
531 let mut candidate = if self.next_session_id == 0 {
532 1
533 } else {
534 self.next_session_id
535 };
536
537 for _ in 0..u32::MAX {
538 if !self.session_to_peer.contains_key(&candidate) {
539 self.next_session_id = candidate.wrapping_add(1);
540 if self.next_session_id == 0 {
541 self.next_session_id = 1;
542 }
543 return Ok(candidate);
544 }
545
546 candidate = candidate.wrapping_add(1);
547 if candidate == 0 {
548 candidate = 1;
549 }
550 }
551
552 Err(io::Error::other("session id space exhausted"))
553 }
554}
555
556pub trait SessionFacadeHandler {
557 fn on_connect<'a>(
558 &'a mut self,
559 _session_id: SessionId,
560 _addr: IpAddr,
561 _port: u16,
562 _client_guid: u64,
563 ) -> ServerHookFuture<'a> {
564 Box::pin(async { Ok(()) })
565 }
566
567 fn on_disconnect<'a>(
568 &'a mut self,
569 _session_id: SessionId,
570 _reason: PeerDisconnectReason,
571 ) -> ServerHookFuture<'a> {
572 Box::pin(async { Ok(()) })
573 }
574
575 fn on_packet<'a>(
576 &'a mut self,
577 _session_id: SessionId,
578 _payload: Bytes,
579 ) -> ServerHookFuture<'a> {
580 Box::pin(async { Ok(()) })
581 }
582
583 fn on_ack<'a>(&'a mut self, _session_id: SessionId, _receipt_id: u64) -> ServerHookFuture<'a> {
584 Box::pin(async { Ok(()) })
585 }
586
587 fn on_metrics<'a>(
588 &'a mut self,
589 _shard_id: usize,
590 _snapshot: TransportMetricsSnapshot,
591 _dropped_non_critical_events: u64,
592 ) -> ServerHookFuture<'a> {
593 Box::pin(async { Ok(()) })
594 }
595}
596
597pub async fn dispatch_session_facade<H: SessionFacadeHandler>(
598 adapter: &mut SessionIdAdapter,
599 handler: &mut H,
600 event: RaknetServerEvent,
601) -> io::Result<()> {
602 match event {
603 RaknetServerEvent::PeerConnected {
604 peer_id,
605 addr,
606 client_guid,
607 ..
608 } => {
609 let session_id = adapter.register_peer(peer_id)?;
610 handler
611 .on_connect(session_id, addr.ip(), addr.port(), client_guid)
612 .await?;
613 }
614 RaknetServerEvent::PeerDisconnected {
615 peer_id, reason, ..
616 } => {
617 if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
618 handler.on_disconnect(session_id, reason).await?;
619 adapter.unregister_peer(peer_id);
620 } else {
621 debug!(
622 peer_id = peer_id.as_u64(),
623 ?reason,
624 "ignoring disconnect for unknown session id mapping"
625 );
626 }
627 }
628 RaknetServerEvent::Packet {
629 peer_id, payload, ..
630 } => {
631 if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
632 handler.on_packet(session_id, payload).await?;
633 } else {
634 debug!(
635 peer_id = peer_id.as_u64(),
636 "dropping packet callback because session id mapping is missing"
637 );
638 }
639 }
640 RaknetServerEvent::ReceiptAcked {
641 peer_id,
642 receipt_id,
643 ..
644 } => {
645 if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
646 handler.on_ack(session_id, receipt_id).await?;
647 } else {
648 debug!(
649 peer_id = peer_id.as_u64(),
650 receipt_id, "dropping ack callback because session id mapping is missing"
651 );
652 }
653 }
654 RaknetServerEvent::Metrics {
655 shard_id,
656 snapshot,
657 dropped_non_critical_events,
658 } => {
659 handler
660 .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
661 .await?;
662 }
663 RaknetServerEvent::OfflinePacket { .. }
664 | RaknetServerEvent::PeerRateLimited { .. }
665 | RaknetServerEvent::SessionLimitReached { .. }
666 | RaknetServerEvent::ProxyDropped { .. }
667 | RaknetServerEvent::DecodeError { .. }
668 | RaknetServerEvent::WorkerError { .. }
669 | RaknetServerEvent::WorkerStopped { .. } => {}
670 }
671
672 Ok(())
673}
674
675pub struct SessionFacade<'a, H: SessionFacadeHandler> {
676 server: &'a mut RaknetServer,
677 handler: &'a mut H,
678 adapter: SessionIdAdapter,
679}
680
681impl<'a, H: SessionFacadeHandler> SessionFacade<'a, H> {
682 pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
683 Self {
684 server,
685 handler,
686 adapter: SessionIdAdapter::new(),
687 }
688 }
689
690 pub fn with_adapter(
691 server: &'a mut RaknetServer,
692 handler: &'a mut H,
693 adapter: SessionIdAdapter,
694 ) -> Self {
695 Self {
696 server,
697 handler,
698 adapter,
699 }
700 }
701
702 pub async fn next(&mut self) -> io::Result<bool> {
703 let Some(event) = self.server.next_event().await else {
704 return Ok(false);
705 };
706 self.dispatch(event).await?;
707 Ok(true)
708 }
709
710 pub async fn run(&mut self) -> io::Result<()> {
711 while self.next().await? {}
712 Ok(())
713 }
714
715 pub fn server(&self) -> &RaknetServer {
716 self.server
717 }
718
719 pub fn server_mut(&mut self) -> &mut RaknetServer {
720 self.server
721 }
722
723 pub fn handler(&self) -> &H {
724 self.handler
725 }
726
727 pub fn handler_mut(&mut self) -> &mut H {
728 self.handler
729 }
730
731 pub fn adapter(&self) -> &SessionIdAdapter {
732 &self.adapter
733 }
734
735 pub fn adapter_mut(&mut self) -> &mut SessionIdAdapter {
736 &mut self.adapter
737 }
738
739 pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
740 self.adapter.session_id_for_peer(peer_id)
741 }
742
743 pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
744 self.adapter.peer_id_for_session(session_id)
745 }
746
747 pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
748 self.adapter.peer_id_for_session_i32(session_id)
749 }
750
751 pub async fn send(
752 &mut self,
753 session_id: SessionId,
754 payload: impl Into<Bytes>,
755 ) -> io::Result<()> {
756 let peer_id = self.resolve_peer_id(session_id)?;
757 self.server.send(peer_id, payload).await
758 }
759
760 pub async fn send_with_options(
761 &mut self,
762 session_id: SessionId,
763 payload: impl Into<Bytes>,
764 options: SendOptions,
765 ) -> io::Result<()> {
766 let peer_id = self.resolve_peer_id(session_id)?;
767 self.server
768 .send_with_options(peer_id, payload, options)
769 .await
770 }
771
772 pub async fn send_with_receipt(
773 &mut self,
774 session_id: SessionId,
775 payload: impl Into<Bytes>,
776 receipt_id: u64,
777 ) -> io::Result<()> {
778 let peer_id = self.resolve_peer_id(session_id)?;
779 self.server
780 .send_with_receipt(peer_id, payload, receipt_id)
781 .await
782 }
783
784 pub async fn disconnect(&mut self, session_id: SessionId) -> io::Result<()> {
785 let peer_id = self.resolve_peer_id(session_id)?;
786 self.server.disconnect(peer_id).await
787 }
788
789 async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
790 dispatch_session_facade(&mut self.adapter, self.handler, event).await
791 }
792
793 fn resolve_peer_id(&self, session_id: SessionId) -> io::Result<PeerId> {
794 self.peer_id_for_session(session_id).ok_or_else(|| {
795 io::Error::new(
796 io::ErrorKind::NotFound,
797 format!("session id {session_id} is not mapped to any peer"),
798 )
799 })
800 }
801}
802
803#[derive(Debug, Clone, Default)]
804pub struct RaknetServerBuilder {
805 transport_config: TransportConfig,
806 runtime_config: ShardedRuntimeConfig,
807}
808
809impl RaknetServerBuilder {
810 pub fn transport_config(mut self, config: TransportConfig) -> Self {
811 self.transport_config = config;
812 self
813 }
814
815 pub fn runtime_config(mut self, config: ShardedRuntimeConfig) -> Self {
816 self.runtime_config = config;
817 self
818 }
819
820 pub fn bind_addr(mut self, bind_addr: SocketAddr) -> Self {
821 self.transport_config.bind_addr = bind_addr;
822 self
823 }
824
825 pub fn shard_count(mut self, shard_count: usize) -> Self {
826 self.runtime_config.shard_count = shard_count.max(1);
827 self
828 }
829
830 pub fn transport_config_mut(&mut self) -> &mut TransportConfig {
831 &mut self.transport_config
832 }
833
834 pub fn runtime_config_mut(&mut self) -> &mut ShardedRuntimeConfig {
835 &mut self.runtime_config
836 }
837
838 pub async fn start(self) -> io::Result<RaknetServer> {
839 self.transport_config
840 .validate()
841 .map_err(invalid_config_io_error)?;
842 self.runtime_config
843 .validate()
844 .map_err(invalid_config_io_error)?;
845 RaknetServer::start_with_configs(self.transport_config, self.runtime_config).await
846 }
847}
848
849#[derive(Debug, Clone, Copy)]
850struct PeerBinding {
851 peer_id: PeerId,
852 shard_id: usize,
853}
854
855pub struct RaknetServer {
856 runtime: ShardedRuntimeHandle,
857 peers_by_addr: FastMap<SocketAddr, PeerBinding>,
858 addrs_by_peer: FastMap<PeerId, SocketAddr>,
859 pending_events: VecDeque<RaknetServerEvent>,
860 next_peer_id: u64,
861}
862
863impl RaknetServer {
864 pub fn builder() -> RaknetServerBuilder {
865 RaknetServerBuilder::default()
866 }
867
868 pub async fn bind(bind_addr: SocketAddr) -> io::Result<Self> {
869 Self::builder().bind_addr(bind_addr).start().await
870 }
871
872 pub fn facade(&mut self) -> ServerFacade<'_> {
873 ServerFacade::new(self)
874 }
875
876 pub fn event_facade<'a, H: EventFacadeHandler>(
877 &'a mut self,
878 handler: &'a mut H,
879 ) -> EventFacade<'a, H> {
880 EventFacade::new(self, handler)
881 }
882
883 pub fn session_facade<'a, H: SessionFacadeHandler>(
884 &'a mut self,
885 handler: &'a mut H,
886 ) -> SessionFacade<'a, H> {
887 SessionFacade::new(self, handler)
888 }
889
890 pub async fn start_with_configs(
891 transport_config: TransportConfig,
892 runtime_config: ShardedRuntimeConfig,
893 ) -> io::Result<Self> {
894 transport_config
895 .validate()
896 .map_err(invalid_config_io_error)?;
897 runtime_config.validate().map_err(invalid_config_io_error)?;
898 let runtime = spawn_sharded_runtime(transport_config, runtime_config).await?;
899 Ok(Self {
900 runtime,
901 peers_by_addr: fast_map(),
902 addrs_by_peer: fast_map(),
903 pending_events: VecDeque::new(),
904 next_peer_id: 1,
905 })
906 }
907
908 pub fn peer_addr(&self, peer_id: PeerId) -> Option<SocketAddr> {
909 self.addrs_by_peer.get(&peer_id).map(|addr| *addr)
910 }
911
912 pub fn peer_shard(&self, peer_id: PeerId) -> Option<usize> {
913 let addr = self.addrs_by_peer.get(&peer_id).map(|addr| *addr)?;
914 self.peers_by_addr
915 .get(&addr)
916 .map(|binding| binding.shard_id)
917 }
918
919 pub fn peer_id_for_addr(&self, addr: SocketAddr) -> Option<PeerId> {
920 self.peers_by_addr.get(&addr).map(|binding| binding.peer_id)
921 }
922
923 pub async fn send(&self, peer_id: PeerId, payload: impl Into<Bytes>) -> io::Result<()> {
924 self.send_with_options(peer_id, payload, SendOptions::default())
925 .await
926 }
927
928 pub async fn send_with_options(
929 &self,
930 peer_id: PeerId,
931 payload: impl Into<Bytes>,
932 options: SendOptions,
933 ) -> io::Result<()> {
934 let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
935
936 self.runtime
937 .send_payload_to_shard(
938 shard_id,
939 ShardedSendPayload {
940 addr,
941 payload: payload.into(),
942 reliability: options.reliability,
943 channel: options.channel,
944 priority: options.priority,
945 },
946 )
947 .await
948 }
949
950 pub async fn send_with_receipt(
951 &self,
952 peer_id: PeerId,
953 payload: impl Into<Bytes>,
954 receipt_id: u64,
955 ) -> io::Result<()> {
956 self.send_with_options_and_receipt(peer_id, payload, SendOptions::default(), receipt_id)
957 .await
958 }
959
960 pub async fn send_with_options_and_receipt(
961 &self,
962 peer_id: PeerId,
963 payload: impl Into<Bytes>,
964 options: SendOptions,
965 receipt_id: u64,
966 ) -> io::Result<()> {
967 let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
968
969 self.runtime
970 .send_payload_to_shard_with_receipt(
971 shard_id,
972 ShardedSendPayload {
973 addr,
974 payload: payload.into(),
975 reliability: options.reliability,
976 channel: options.channel,
977 priority: options.priority,
978 },
979 receipt_id,
980 )
981 .await
982 }
983
984 pub async fn disconnect(&mut self, peer_id: PeerId) -> io::Result<()> {
985 let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
986 info!(
987 peer_id = peer_id.as_u64(),
988 %addr,
989 shard_id,
990 "server disconnect requested"
991 );
992
993 self.runtime
994 .disconnect_peer_from_shard(shard_id, addr)
995 .await?;
996 self.remove_peer(addr);
997 self.pending_events
998 .push_back(RaknetServerEvent::PeerDisconnected {
999 peer_id,
1000 addr,
1001 reason: PeerDisconnectReason::Requested,
1002 });
1003 Ok(())
1004 }
1005
1006 pub async fn next_event(&mut self) -> Option<RaknetServerEvent> {
1007 if let Some(event) = self.pending_events.pop_front() {
1008 return Some(event);
1009 }
1010
1011 loop {
1012 let runtime_event = self.runtime.event_rx.recv().await?;
1013 self.enqueue_runtime_event(runtime_event);
1014 if let Some(event) = self.pending_events.pop_front() {
1015 return Some(event);
1016 }
1017 }
1018 }
1019
1020 pub async fn shutdown(self) -> io::Result<()> {
1021 self.runtime.shutdown().await
1022 }
1023
1024 fn enqueue_runtime_event(&mut self, runtime_event: ShardedRuntimeEvent) {
1025 match runtime_event {
1026 ShardedRuntimeEvent::Transport { shard_id, event } => match event {
1027 TransportEvent::PeerDisconnected { addr, reason } => {
1028 if let Some(peer_id) = self.remove_peer(addr) {
1029 let reason = match reason {
1030 RemoteDisconnectReason::DisconnectionNotification { reason_code } => {
1031 PeerDisconnectReason::RemoteDisconnectionNotification {
1032 reason_code,
1033 }
1034 }
1035 RemoteDisconnectReason::DetectLostConnection => {
1036 PeerDisconnectReason::RemoteDetectLostConnection
1037 }
1038 };
1039 info!(
1040 peer_id = peer_id.as_u64(),
1041 %addr,
1042 ?reason,
1043 "peer disconnected"
1044 );
1045 self.pending_events
1046 .push_back(RaknetServerEvent::PeerDisconnected {
1047 peer_id,
1048 addr,
1049 reason,
1050 });
1051 } else {
1052 debug!(
1053 %addr,
1054 ?reason,
1055 "received peer disconnect for unknown address"
1056 );
1057 }
1058 }
1059 TransportEvent::ConnectedFrames {
1060 addr,
1061 client_guid,
1062 frames,
1063 receipts,
1064 ..
1065 } => {
1066 let has_frames = !frames.is_empty();
1067 let has_receipts = !receipts.acked_receipt_ids.is_empty();
1068
1069 if client_guid.is_none() && !has_frames && !has_receipts {
1070 debug!(
1071 %addr,
1072 shard_id,
1073 "ignoring pre-connect transport event without frames/receipts"
1074 );
1075 } else {
1076 let (peer_id, is_new) = self.ensure_peer(addr, shard_id);
1077 if is_new {
1078 let client_guid = client_guid.unwrap_or(peer_id.as_u64());
1079 info!(
1080 peer_id = peer_id.as_u64(),
1081 %addr,
1082 client_guid,
1083 shard_id,
1084 "peer connected"
1085 );
1086 self.pending_events
1087 .push_back(RaknetServerEvent::PeerConnected {
1088 peer_id,
1089 addr,
1090 client_guid,
1091 shard_id,
1092 });
1093 }
1094
1095 for frame in frames {
1096 self.pending_events.push_back(RaknetServerEvent::Packet {
1097 peer_id,
1098 addr,
1099 payload: frame.payload,
1100 reliability: frame.reliability,
1101 reliable_index: frame.reliable_index,
1102 sequence_index: frame.sequence_index,
1103 ordering_index: frame.ordering_index,
1104 ordering_channel: frame.ordering_channel,
1105 });
1106 }
1107
1108 for receipt_id in receipts.acked_receipt_ids {
1109 self.pending_events
1110 .push_back(RaknetServerEvent::ReceiptAcked {
1111 peer_id,
1112 addr,
1113 receipt_id,
1114 });
1115 }
1116 }
1117 }
1118 TransportEvent::RateLimited { addr } => {
1119 warn!(%addr, "peer rate-limited");
1120 self.pending_events
1121 .push_back(RaknetServerEvent::PeerRateLimited { addr });
1122 }
1123 TransportEvent::SessionLimitReached { addr } => {
1124 warn!(%addr, "session limit reached");
1125 self.pending_events
1126 .push_back(RaknetServerEvent::SessionLimitReached { addr });
1127 }
1128 TransportEvent::ConnectedDatagramDroppedNoSession { .. } => {}
1129 TransportEvent::ProxyDropped { addr } => {
1130 debug!(%addr, "proxy router dropped packet");
1131 self.pending_events
1132 .push_back(RaknetServerEvent::ProxyDropped { addr });
1133 }
1134 TransportEvent::DecodeError { addr, error } => {
1135 warn!(%addr, %error, "transport decode error");
1136 self.pending_events
1137 .push_back(RaknetServerEvent::DecodeError {
1138 addr,
1139 error: error.to_string(),
1140 });
1141 }
1142 TransportEvent::OfflinePacket { addr, packet } => {
1143 self.pending_events
1144 .push_back(RaknetServerEvent::OfflinePacket { addr, packet });
1145 }
1146 },
1147 ShardedRuntimeEvent::Metrics {
1148 shard_id,
1149 snapshot,
1150 dropped_non_critical_events,
1151 } => {
1152 if dropped_non_critical_events > 0 {
1153 debug!(
1154 shard_id,
1155 dropped_non_critical_events,
1156 "non-critical runtime events were dropped before metrics emit"
1157 );
1158 }
1159 self.pending_events.push_back(RaknetServerEvent::Metrics {
1160 shard_id,
1161 snapshot,
1162 dropped_non_critical_events,
1163 });
1164 }
1165 ShardedRuntimeEvent::WorkerError { shard_id, message } => {
1166 warn!(shard_id, %message, "runtime worker error");
1167 self.pending_events
1168 .push_back(RaknetServerEvent::WorkerError { shard_id, message });
1169 }
1170 ShardedRuntimeEvent::WorkerStopped { shard_id } => {
1171 warn!(shard_id, "runtime worker stopped");
1172 let mut disconnected = Vec::new();
1173 for peer in self.peers_by_addr.iter() {
1174 let addr = *peer.key();
1175 let binding = *peer.value();
1176 if binding.shard_id == shard_id {
1177 disconnected.push((addr, binding.peer_id));
1178 }
1179 }
1180 for (addr, peer_id) in disconnected {
1181 self.remove_peer(addr);
1182 info!(
1183 peer_id = peer_id.as_u64(),
1184 %addr,
1185 shard_id,
1186 "peer disconnected because worker stopped"
1187 );
1188 self.pending_events
1189 .push_back(RaknetServerEvent::PeerDisconnected {
1190 peer_id,
1191 addr,
1192 reason: PeerDisconnectReason::WorkerStopped { shard_id },
1193 });
1194 }
1195 self.pending_events
1196 .push_back(RaknetServerEvent::WorkerStopped { shard_id });
1197 }
1198 }
1199 }
1200
1201 fn ensure_peer(&mut self, addr: SocketAddr, shard_id: usize) -> (PeerId, bool) {
1202 if let Some(mut binding) = self.peers_by_addr.get_mut(&addr) {
1203 if binding.shard_id != shard_id {
1204 binding.shard_id = shard_id;
1205 }
1206 return (binding.peer_id, false);
1207 }
1208
1209 let peer_id = PeerId(self.next_peer_id);
1210 self.next_peer_id = self.next_peer_id.saturating_add(1);
1211 self.peers_by_addr
1212 .insert(addr, PeerBinding { peer_id, shard_id });
1213 self.addrs_by_peer.insert(peer_id, addr);
1214 (peer_id, true)
1215 }
1216
1217 fn remove_peer(&mut self, addr: SocketAddr) -> Option<PeerId> {
1218 let (_, binding) = self.peers_by_addr.remove(&addr)?;
1219 self.addrs_by_peer.remove(&binding.peer_id);
1220 Some(binding.peer_id)
1221 }
1222
1223 fn resolve_peer_route(&self, peer_id: PeerId) -> io::Result<(SocketAddr, usize)> {
1224 let addr = self
1225 .addrs_by_peer
1226 .get(&peer_id)
1227 .map(|entry| *entry)
1228 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer id not found"))?;
1229 let shard_id = self
1230 .peers_by_addr
1231 .get(&addr)
1232 .map(|binding| binding.shard_id)
1233 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer shard binding missing"))?;
1234 Ok((addr, shard_id))
1235 }
1236}
1237
1238fn invalid_config_io_error(error: ConfigValidationError) -> io::Error {
1239 io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244 use super::{
1245 EventFacadeHandler, PeerDisconnectReason, PeerId, RaknetServer, RaknetServerBuilder,
1246 RaknetServerEvent, ServerHookFuture, SessionFacadeHandler, SessionId, SessionIdAdapter,
1247 dispatch_event_facade, dispatch_session_facade,
1248 };
1249 use crate::protocol::reliability::Reliability;
1250 use crate::transport::{ShardedRuntimeConfig, TransportConfig, TransportMetricsSnapshot};
1251 use bytes::Bytes;
1252 use std::io;
1253 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1254
1255 #[test]
1256 fn builder_mutators_keep_values() {
1257 let builder = RaknetServerBuilder::default().shard_count(4);
1258 assert_eq!(builder.runtime_config.shard_count, 4);
1259 }
1260
1261 #[test]
1262 fn peer_id_roundtrip() {
1263 let peer = PeerId::from_u64(42);
1264 assert_eq!(peer.as_u64(), 42);
1265 }
1266
1267 #[test]
1268 fn builder_type_is_exposed() {
1269 let _ = RaknetServer::builder();
1270 }
1271
1272 #[tokio::test]
1273 async fn start_with_invalid_runtime_config_fails_fast() {
1274 let transport = TransportConfig {
1275 bind_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1276 ..TransportConfig::default()
1277 };
1278 let runtime = ShardedRuntimeConfig {
1279 shard_count: 0,
1280 ..ShardedRuntimeConfig::default()
1281 };
1282
1283 match RaknetServer::start_with_configs(transport, runtime).await {
1284 Ok(_) => panic!("invalid config must fail before runtime start"),
1285 Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
1286 }
1287 }
1288
1289 #[derive(Default)]
1290 struct CountingEventHandler {
1291 connect_calls: usize,
1292 disconnect_calls: usize,
1293 packet_calls: usize,
1294 ack_calls: usize,
1295 metrics_calls: usize,
1296 last_connect: Option<(u64, IpAddr, u16, u64)>,
1297 last_disconnect: Option<(u64, PeerDisconnectReason)>,
1298 last_packet: Option<(u64, Bytes)>,
1299 last_ack: Option<(u64, u64)>,
1300 last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1301 }
1302
1303 impl EventFacadeHandler for CountingEventHandler {
1304 fn on_connect<'a>(
1305 &'a mut self,
1306 session_id: u64,
1307 addr: IpAddr,
1308 port: u16,
1309 client_guid: u64,
1310 ) -> ServerHookFuture<'a> {
1311 self.connect_calls = self.connect_calls.saturating_add(1);
1312 self.last_connect = Some((session_id, addr, port, client_guid));
1313 Box::pin(async { Ok(()) })
1314 }
1315
1316 fn on_disconnect<'a>(
1317 &'a mut self,
1318 session_id: u64,
1319 reason: PeerDisconnectReason,
1320 ) -> ServerHookFuture<'a> {
1321 self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1322 self.last_disconnect = Some((session_id, reason));
1323 Box::pin(async { Ok(()) })
1324 }
1325
1326 fn on_packet<'a>(&'a mut self, session_id: u64, payload: Bytes) -> ServerHookFuture<'a> {
1327 self.packet_calls = self.packet_calls.saturating_add(1);
1328 self.last_packet = Some((session_id, payload));
1329 Box::pin(async { Ok(()) })
1330 }
1331
1332 fn on_ack<'a>(&'a mut self, session_id: u64, receipt_id: u64) -> ServerHookFuture<'a> {
1333 self.ack_calls = self.ack_calls.saturating_add(1);
1334 self.last_ack = Some((session_id, receipt_id));
1335 Box::pin(async { Ok(()) })
1336 }
1337
1338 fn on_metrics<'a>(
1339 &'a mut self,
1340 shard_id: usize,
1341 snapshot: TransportMetricsSnapshot,
1342 dropped_non_critical_events: u64,
1343 ) -> ServerHookFuture<'a> {
1344 self.metrics_calls = self.metrics_calls.saturating_add(1);
1345 self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1346 Box::pin(async { Ok(()) })
1347 }
1348 }
1349
1350 #[tokio::test]
1351 async fn dispatch_event_facade_maps_callbacks() -> io::Result<()> {
1352 let mut handler = CountingEventHandler::default();
1353 let peer_id = PeerId::from_u64(77);
1354 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)), 19132);
1355 let payload = Bytes::from_static(b"\x01\x02event");
1356 let metrics = TransportMetricsSnapshot {
1357 packets_forwarded_total: 33,
1358 bytes_forwarded_total: 1200,
1359 ..TransportMetricsSnapshot::default()
1360 };
1361
1362 dispatch_event_facade(
1363 &mut handler,
1364 RaknetServerEvent::PeerConnected {
1365 peer_id,
1366 addr,
1367 client_guid: 0xAABB_CCDD_EEFF_0011,
1368 shard_id: 2,
1369 },
1370 )
1371 .await?;
1372
1373 dispatch_event_facade(
1374 &mut handler,
1375 RaknetServerEvent::Packet {
1376 peer_id,
1377 addr,
1378 payload: payload.clone(),
1379 reliability: Reliability::ReliableOrdered,
1380 reliable_index: None,
1381 sequence_index: None,
1382 ordering_index: None,
1383 ordering_channel: None,
1384 },
1385 )
1386 .await?;
1387
1388 dispatch_event_facade(
1389 &mut handler,
1390 RaknetServerEvent::ReceiptAcked {
1391 peer_id,
1392 addr,
1393 receipt_id: 9001,
1394 },
1395 )
1396 .await?;
1397
1398 dispatch_event_facade(
1399 &mut handler,
1400 RaknetServerEvent::Metrics {
1401 shard_id: 2,
1402 snapshot: Box::new(metrics),
1403 dropped_non_critical_events: 5,
1404 },
1405 )
1406 .await?;
1407
1408 dispatch_event_facade(
1409 &mut handler,
1410 RaknetServerEvent::PeerDisconnected {
1411 peer_id,
1412 addr,
1413 reason: PeerDisconnectReason::Requested,
1414 },
1415 )
1416 .await?;
1417
1418 dispatch_event_facade(
1419 &mut handler,
1420 RaknetServerEvent::WorkerStopped { shard_id: 0 },
1421 )
1422 .await?;
1423
1424 assert_eq!(handler.connect_calls, 1);
1425 assert_eq!(handler.packet_calls, 1);
1426 assert_eq!(handler.ack_calls, 1);
1427 assert_eq!(handler.metrics_calls, 1);
1428 assert_eq!(handler.disconnect_calls, 1);
1429 assert_eq!(
1430 handler.last_connect,
1431 Some((77, addr.ip(), addr.port(), 0xAABB_CCDD_EEFF_0011))
1432 );
1433 assert_eq!(handler.last_packet, Some((77, payload)));
1434 assert_eq!(handler.last_ack, Some((77, 9001)));
1435 let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1436 .last_metrics
1437 .expect("metrics callback should store last snapshot");
1438 assert_eq!(metrics_shard, 2);
1439 assert_eq!(metrics_dropped, 5);
1440 assert_eq!(metrics_snapshot.packets_forwarded_total, 33);
1441 assert_eq!(metrics_snapshot.bytes_forwarded_total, 1200);
1442 assert_eq!(
1443 handler.last_disconnect,
1444 Some((77, PeerDisconnectReason::Requested))
1445 );
1446
1447 Ok(())
1448 }
1449
1450 #[derive(Default)]
1451 struct CountingSessionHandler {
1452 connect_calls: usize,
1453 disconnect_calls: usize,
1454 packet_calls: usize,
1455 ack_calls: usize,
1456 metrics_calls: usize,
1457 last_connect: Option<(SessionId, IpAddr, u16, u64)>,
1458 last_disconnect: Option<(SessionId, PeerDisconnectReason)>,
1459 last_packet: Option<(SessionId, Bytes)>,
1460 last_ack: Option<(SessionId, u64)>,
1461 last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1462 }
1463
1464 impl SessionFacadeHandler for CountingSessionHandler {
1465 fn on_connect<'a>(
1466 &'a mut self,
1467 session_id: SessionId,
1468 addr: IpAddr,
1469 port: u16,
1470 client_guid: u64,
1471 ) -> ServerHookFuture<'a> {
1472 self.connect_calls = self.connect_calls.saturating_add(1);
1473 self.last_connect = Some((session_id, addr, port, client_guid));
1474 Box::pin(async { Ok(()) })
1475 }
1476
1477 fn on_disconnect<'a>(
1478 &'a mut self,
1479 session_id: SessionId,
1480 reason: PeerDisconnectReason,
1481 ) -> ServerHookFuture<'a> {
1482 self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1483 self.last_disconnect = Some((session_id, reason));
1484 Box::pin(async { Ok(()) })
1485 }
1486
1487 fn on_packet<'a>(
1488 &'a mut self,
1489 session_id: SessionId,
1490 payload: Bytes,
1491 ) -> ServerHookFuture<'a> {
1492 self.packet_calls = self.packet_calls.saturating_add(1);
1493 self.last_packet = Some((session_id, payload));
1494 Box::pin(async { Ok(()) })
1495 }
1496
1497 fn on_ack<'a>(
1498 &'a mut self,
1499 session_id: SessionId,
1500 receipt_id: u64,
1501 ) -> ServerHookFuture<'a> {
1502 self.ack_calls = self.ack_calls.saturating_add(1);
1503 self.last_ack = Some((session_id, receipt_id));
1504 Box::pin(async { Ok(()) })
1505 }
1506
1507 fn on_metrics<'a>(
1508 &'a mut self,
1509 shard_id: usize,
1510 snapshot: TransportMetricsSnapshot,
1511 dropped_non_critical_events: u64,
1512 ) -> ServerHookFuture<'a> {
1513 self.metrics_calls = self.metrics_calls.saturating_add(1);
1514 self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1515 Box::pin(async { Ok(()) })
1516 }
1517 }
1518
1519 #[test]
1520 fn session_id_adapter_bridges_peer_and_signed_ids() {
1521 let mut adapter = SessionIdAdapter::new();
1522 let peer_a = PeerId::from_u64(0x1_0000_0001);
1523 let peer_b = PeerId::from_u64(0x2_0000_0002);
1524
1525 let session_a = adapter
1526 .register_peer(peer_a)
1527 .expect("first session id allocation should succeed");
1528 let session_b = adapter
1529 .register_peer(peer_b)
1530 .expect("second session id allocation should succeed");
1531
1532 assert_eq!(session_a, 1);
1533 assert_eq!(session_b, 2);
1534 assert_eq!(adapter.session_id_for_peer(peer_a), Some(session_a));
1535 assert_eq!(adapter.peer_id_for_session(session_b), Some(peer_b));
1536 assert_eq!(adapter.peer_id_for_session_i32(2), Some(peer_b));
1537 assert_eq!(adapter.peer_id_for_session_i32(-1), None);
1538 assert_eq!(SessionIdAdapter::session_id_to_i32(session_a), Some(1));
1539 assert_eq!(SessionIdAdapter::session_id_from_i32(2), Some(2));
1540 assert_eq!(SessionIdAdapter::session_id_from_i32(-5), None);
1541
1542 assert_eq!(adapter.unregister_peer(peer_a), Some(session_a));
1543 assert_eq!(adapter.session_id_for_peer(peer_a), None);
1544 assert_eq!(adapter.peer_id_for_session(session_a), None);
1545 }
1546
1547 #[tokio::test]
1548 async fn dispatch_session_facade_maps_callbacks_and_releases_mapping() -> io::Result<()> {
1549 let mut adapter = SessionIdAdapter::new();
1550 let mut handler = CountingSessionHandler::default();
1551 let peer_id = PeerId::from_u64(0xDEAD_BEEF_F00D);
1552 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 1, 2, 3)), 19133);
1553 let payload = Bytes::from_static(b"\x10\x20session");
1554 let metrics = TransportMetricsSnapshot {
1555 packets_forwarded_total: 5,
1556 bytes_forwarded_total: 80,
1557 ..TransportMetricsSnapshot::default()
1558 };
1559
1560 dispatch_session_facade(
1561 &mut adapter,
1562 &mut handler,
1563 RaknetServerEvent::PeerConnected {
1564 peer_id,
1565 addr,
1566 client_guid: 0xABCD_EF01_0203_0405,
1567 shard_id: 0,
1568 },
1569 )
1570 .await?;
1571
1572 let session_id = adapter
1573 .session_id_for_peer(peer_id)
1574 .expect("session id should be registered after connect");
1575 assert_eq!(session_id, 1);
1576
1577 dispatch_session_facade(
1578 &mut adapter,
1579 &mut handler,
1580 RaknetServerEvent::Packet {
1581 peer_id,
1582 addr,
1583 payload: payload.clone(),
1584 reliability: Reliability::ReliableOrdered,
1585 reliable_index: None,
1586 sequence_index: None,
1587 ordering_index: None,
1588 ordering_channel: None,
1589 },
1590 )
1591 .await?;
1592
1593 dispatch_session_facade(
1594 &mut adapter,
1595 &mut handler,
1596 RaknetServerEvent::ReceiptAcked {
1597 peer_id,
1598 addr,
1599 receipt_id: 44,
1600 },
1601 )
1602 .await?;
1603
1604 dispatch_session_facade(
1605 &mut adapter,
1606 &mut handler,
1607 RaknetServerEvent::Metrics {
1608 shard_id: 0,
1609 snapshot: Box::new(metrics),
1610 dropped_non_critical_events: 7,
1611 },
1612 )
1613 .await?;
1614
1615 dispatch_session_facade(
1616 &mut adapter,
1617 &mut handler,
1618 RaknetServerEvent::PeerDisconnected {
1619 peer_id,
1620 addr,
1621 reason: PeerDisconnectReason::Requested,
1622 },
1623 )
1624 .await?;
1625
1626 assert_eq!(handler.connect_calls, 1);
1627 assert_eq!(handler.packet_calls, 1);
1628 assert_eq!(handler.ack_calls, 1);
1629 assert_eq!(handler.metrics_calls, 1);
1630 assert_eq!(handler.disconnect_calls, 1);
1631 assert_eq!(
1632 handler.last_connect,
1633 Some((1, addr.ip(), addr.port(), 0xABCD_EF01_0203_0405))
1634 );
1635 assert_eq!(handler.last_packet, Some((1, payload)));
1636 assert_eq!(handler.last_ack, Some((1, 44)));
1637 assert_eq!(
1638 handler.last_disconnect,
1639 Some((1, PeerDisconnectReason::Requested))
1640 );
1641 assert_eq!(adapter.session_id_for_peer(peer_id), None);
1642 assert_eq!(adapter.peer_id_for_session(1), None);
1643
1644 let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1645 .last_metrics
1646 .expect("metrics callback should store last snapshot");
1647 assert_eq!(metrics_shard, 0);
1648 assert_eq!(metrics_dropped, 7);
1649 assert_eq!(metrics_snapshot.packets_forwarded_total, 5);
1650 assert_eq!(metrics_snapshot.bytes_forwarded_total, 80);
1651
1652 Ok(())
1653 }
1654}