1use std::collections::VecDeque;
10use std::future::Future;
11use std::io;
12use std::net::{IpAddr, SocketAddr};
13use std::pin::Pin;
14
15use bytes::Bytes;
16use tracing::{debug, info, warn};
17
18use crate::concurrency::{FastMap, fast_map};
19use crate::error::ConfigValidationError;
20use crate::handshake::OfflinePacket;
21use crate::protocol::reliability::Reliability;
22use crate::protocol::sequence24::Sequence24;
23use crate::session::RakPriority;
24use crate::transport::{
25 RemoteDisconnectReason, ShardedRuntimeConfig, ShardedRuntimeEvent, ShardedRuntimeHandle,
26 ShardedSendPayload, TransportConfig, TransportEvent, TransportMetricsSnapshot,
27 spawn_sharded_runtime,
28};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct PeerId(u64);
33
34impl PeerId {
35 pub const fn from_u64(value: u64) -> Self {
37 Self(value)
38 }
39
40 pub const fn as_u64(self) -> u64 {
42 self.0
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct SendOptions {
49 pub reliability: Reliability,
51 pub channel: u8,
53 pub priority: RakPriority,
55}
56
57impl Default for SendOptions {
58 fn default() -> Self {
59 Self {
60 reliability: Reliability::ReliableOrdered,
61 channel: 0,
62 priority: RakPriority::High,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum PeerDisconnectReason {
70 Requested,
71 RemoteDisconnectionNotification { reason_code: Option<u8> },
72 RemoteDetectLostConnection,
73 WorkerStopped { shard_id: usize },
74}
75
76#[derive(Debug)]
77pub enum RaknetServerEvent {
79 PeerConnected {
80 peer_id: PeerId,
81 addr: SocketAddr,
82 client_guid: u64,
83 shard_id: usize,
84 },
85 PeerDisconnected {
86 peer_id: PeerId,
87 addr: SocketAddr,
88 reason: PeerDisconnectReason,
89 },
90 Packet {
91 peer_id: PeerId,
92 addr: SocketAddr,
93 payload: Bytes,
94 reliability: Reliability,
95 reliable_index: Option<Sequence24>,
96 sequence_index: Option<Sequence24>,
97 ordering_index: Option<Sequence24>,
98 ordering_channel: Option<u8>,
99 },
100 OfflinePacket {
101 addr: SocketAddr,
102 packet: OfflinePacket,
103 },
104 ReceiptAcked {
105 peer_id: PeerId,
106 addr: SocketAddr,
107 receipt_id: u64,
108 },
109 PeerRateLimited {
110 addr: SocketAddr,
111 },
112 SessionLimitReached {
113 addr: SocketAddr,
114 },
115 ProxyDropped {
116 addr: SocketAddr,
117 },
118 DecodeError {
119 addr: SocketAddr,
120 error: String,
121 },
122 WorkerError {
123 shard_id: usize,
124 message: String,
125 },
126 WorkerStopped {
127 shard_id: usize,
128 },
129 Metrics {
130 shard_id: usize,
131 snapshot: Box<TransportMetricsSnapshot>,
132 dropped_non_critical_events: u64,
133 },
134}
135
136impl RaknetServerEvent {
137 pub fn metrics_snapshot(&self) -> Option<(usize, &TransportMetricsSnapshot, u64)> {
139 match self {
140 Self::Metrics {
141 shard_id,
142 snapshot,
143 dropped_non_critical_events,
144 } => Some((*shard_id, snapshot.as_ref(), *dropped_non_critical_events)),
145 _ => None,
146 }
147 }
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct ConnectEvent {
153 pub peer_id: PeerId,
154 pub addr: SocketAddr,
155 pub client_guid: u64,
156 pub shard_id: usize,
157}
158
159#[derive(Debug, Clone)]
160pub struct PacketEvent {
162 pub peer_id: PeerId,
163 pub addr: SocketAddr,
164 pub payload: Bytes,
165 pub reliability: Reliability,
166 pub reliable_index: Option<Sequence24>,
167 pub sequence_index: Option<Sequence24>,
168 pub ordering_index: Option<Sequence24>,
169 pub ordering_channel: Option<u8>,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct DisconnectEvent {
175 pub peer_id: PeerId,
176 pub addr: SocketAddr,
177 pub reason: PeerDisconnectReason,
178}
179
180pub type ServerHookFuture<'a> = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
182
183type ConnectHandler =
184 Box<dyn for<'a> FnMut(&'a mut RaknetServer, ConnectEvent) -> ServerHookFuture<'a> + Send>;
185type PacketHandler =
186 Box<dyn for<'a> FnMut(&'a mut RaknetServer, PacketEvent) -> ServerHookFuture<'a> + Send>;
187type DisconnectHandler =
188 Box<dyn for<'a> FnMut(&'a mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'a> + Send>;
189
190pub struct ServerFacade<'a> {
194 server: &'a mut RaknetServer,
195 on_connect: Option<ConnectHandler>,
196 on_packet: Option<PacketHandler>,
197 on_disconnect: Option<DisconnectHandler>,
198}
199
200impl<'a> ServerFacade<'a> {
201 pub fn new(server: &'a mut RaknetServer) -> Self {
203 Self {
204 server,
205 on_connect: None,
206 on_packet: None,
207 on_disconnect: None,
208 }
209 }
210
211 pub fn on_connect<F>(mut self, handler: F) -> Self
213 where
214 F: for<'b> FnMut(&'b mut RaknetServer, ConnectEvent) -> ServerHookFuture<'b>
215 + Send
216 + 'static,
217 {
218 self.on_connect = Some(Box::new(handler));
219 self
220 }
221
222 pub fn on_packet<F>(mut self, handler: F) -> Self
224 where
225 F: for<'b> FnMut(&'b mut RaknetServer, PacketEvent) -> ServerHookFuture<'b>
226 + Send
227 + 'static,
228 {
229 self.on_packet = Some(Box::new(handler));
230 self
231 }
232
233 pub fn on_disconnect<F>(mut self, handler: F) -> Self
235 where
236 F: for<'b> FnMut(&'b mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'b>
237 + Send
238 + 'static,
239 {
240 self.on_disconnect = Some(Box::new(handler));
241 self
242 }
243
244 pub async fn next(&mut self) -> io::Result<bool> {
248 let Some(event) = self.server.next_event().await else {
249 return Ok(false);
250 };
251 self.dispatch(event).await?;
252 Ok(true)
253 }
254
255 pub async fn run(&mut self) -> io::Result<()> {
257 while self.next().await? {}
258 Ok(())
259 }
260
261 pub fn server(&self) -> &RaknetServer {
263 self.server
264 }
265
266 pub fn server_mut(&mut self) -> &mut RaknetServer {
268 self.server
269 }
270
271 async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
272 match event {
273 RaknetServerEvent::PeerConnected {
274 peer_id,
275 addr,
276 client_guid,
277 shard_id,
278 } => {
279 if let Some(handler) = self.on_connect.as_mut() {
280 handler(
281 self.server,
282 ConnectEvent {
283 peer_id,
284 addr,
285 client_guid,
286 shard_id,
287 },
288 )
289 .await?;
290 }
291 }
292 RaknetServerEvent::Packet {
293 peer_id,
294 addr,
295 payload,
296 reliability,
297 reliable_index,
298 sequence_index,
299 ordering_index,
300 ordering_channel,
301 } => {
302 if let Some(handler) = self.on_packet.as_mut() {
303 handler(
304 self.server,
305 PacketEvent {
306 peer_id,
307 addr,
308 payload,
309 reliability,
310 reliable_index,
311 sequence_index,
312 ordering_index,
313 ordering_channel,
314 },
315 )
316 .await?;
317 }
318 }
319 RaknetServerEvent::PeerDisconnected {
320 peer_id,
321 addr,
322 reason,
323 } => {
324 if let Some(handler) = self.on_disconnect.as_mut() {
325 handler(
326 self.server,
327 DisconnectEvent {
328 peer_id,
329 addr,
330 reason,
331 },
332 )
333 .await?;
334 }
335 }
336 RaknetServerEvent::OfflinePacket { .. }
337 | RaknetServerEvent::ReceiptAcked { .. }
338 | RaknetServerEvent::PeerRateLimited { .. }
339 | RaknetServerEvent::SessionLimitReached { .. }
340 | RaknetServerEvent::ProxyDropped { .. }
341 | RaknetServerEvent::DecodeError { .. }
342 | RaknetServerEvent::WorkerError { .. }
343 | RaknetServerEvent::WorkerStopped { .. }
344 | RaknetServerEvent::Metrics { .. } => {}
345 }
346
347 Ok(())
348 }
349}
350
351pub trait EventFacadeHandler {
356 fn on_connect<'a>(
357 &'a mut self,
358 _session_id: u64,
359 _addr: IpAddr,
360 _port: u16,
361 _client_guid: u64,
362 ) -> ServerHookFuture<'a> {
363 Box::pin(async { Ok(()) })
364 }
365
366 fn on_disconnect<'a>(
367 &'a mut self,
368 _session_id: u64,
369 _reason: PeerDisconnectReason,
370 ) -> ServerHookFuture<'a> {
371 Box::pin(async { Ok(()) })
372 }
373
374 fn on_packet<'a>(&'a mut self, _session_id: u64, _payload: Bytes) -> ServerHookFuture<'a> {
375 Box::pin(async { Ok(()) })
376 }
377
378 fn on_ack<'a>(&'a mut self, _session_id: u64, _receipt_id: u64) -> ServerHookFuture<'a> {
379 Box::pin(async { Ok(()) })
380 }
381
382 fn on_metrics<'a>(
383 &'a mut self,
384 _shard_id: usize,
385 _snapshot: TransportMetricsSnapshot,
386 _dropped_non_critical_events: u64,
387 ) -> ServerHookFuture<'a> {
388 Box::pin(async { Ok(()) })
389 }
390}
391
392pub async fn dispatch_event_facade<H: EventFacadeHandler>(
394 handler: &mut H,
395 event: RaknetServerEvent,
396) -> io::Result<()> {
397 match event {
398 RaknetServerEvent::PeerConnected {
399 peer_id,
400 addr,
401 client_guid,
402 ..
403 } => {
404 handler
405 .on_connect(peer_id.as_u64(), addr.ip(), addr.port(), client_guid)
406 .await?;
407 }
408 RaknetServerEvent::PeerDisconnected {
409 peer_id, reason, ..
410 } => {
411 handler.on_disconnect(peer_id.as_u64(), reason).await?;
412 }
413 RaknetServerEvent::Packet {
414 peer_id, payload, ..
415 } => {
416 handler.on_packet(peer_id.as_u64(), payload).await?;
417 }
418 RaknetServerEvent::ReceiptAcked {
419 peer_id,
420 receipt_id,
421 ..
422 } => {
423 handler.on_ack(peer_id.as_u64(), receipt_id).await?;
424 }
425 RaknetServerEvent::Metrics {
426 shard_id,
427 snapshot,
428 dropped_non_critical_events,
429 } => {
430 handler
431 .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
432 .await?;
433 }
434 RaknetServerEvent::OfflinePacket { .. }
435 | RaknetServerEvent::PeerRateLimited { .. }
436 | RaknetServerEvent::SessionLimitReached { .. }
437 | RaknetServerEvent::ProxyDropped { .. }
438 | RaknetServerEvent::DecodeError { .. }
439 | RaknetServerEvent::WorkerError { .. }
440 | RaknetServerEvent::WorkerStopped { .. } => {}
441 }
442
443 Ok(())
444}
445
446pub struct EventFacade<'a, H: EventFacadeHandler> {
448 server: &'a mut RaknetServer,
449 handler: &'a mut H,
450}
451
452impl<'a, H: EventFacadeHandler> EventFacade<'a, H> {
453 pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
455 Self { server, handler }
456 }
457
458 pub async fn next(&mut self) -> io::Result<bool> {
462 let Some(event) = self.server.next_event().await else {
463 return Ok(false);
464 };
465 self.dispatch(event).await?;
466 Ok(true)
467 }
468
469 pub async fn run(&mut self) -> io::Result<()> {
471 while self.next().await? {}
472 Ok(())
473 }
474
475 pub fn server(&self) -> &RaknetServer {
477 self.server
478 }
479
480 pub fn server_mut(&mut self) -> &mut RaknetServer {
482 self.server
483 }
484
485 pub fn handler(&self) -> &H {
487 self.handler
488 }
489
490 pub fn handler_mut(&mut self) -> &mut H {
492 self.handler
493 }
494
495 async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
496 dispatch_event_facade(self.handler, event).await
497 }
498}
499
500pub type SessionId = u32;
501
502#[derive(Debug)]
503pub struct SessionIdAdapter {
505 peer_to_session: FastMap<PeerId, SessionId>,
506 session_to_peer: FastMap<SessionId, PeerId>,
507 next_session_id: SessionId,
508}
509
510impl Default for SessionIdAdapter {
511 fn default() -> Self {
512 Self::new()
513 }
514}
515
516impl SessionIdAdapter {
517 pub fn new() -> Self {
519 Self {
520 peer_to_session: fast_map(),
521 session_to_peer: fast_map(),
522 next_session_id: 1,
523 }
524 }
525
526 pub fn len(&self) -> usize {
528 self.peer_to_session.len()
529 }
530
531 pub fn is_empty(&self) -> bool {
533 self.peer_to_session.is_empty()
534 }
535
536 pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
538 self.peer_to_session.get(&peer_id).map(|entry| *entry)
539 }
540
541 pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
543 self.session_to_peer.get(&session_id).map(|entry| *entry)
544 }
545
546 pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
548 let session_id = Self::session_id_from_i32(session_id)?;
549 self.peer_id_for_session(session_id)
550 }
551
552 pub fn register_peer(&mut self, peer_id: PeerId) -> io::Result<SessionId> {
556 if let Some(existing) = self.session_id_for_peer(peer_id) {
557 return Ok(existing);
558 }
559
560 let session_id = self.allocate_session_id()?;
561 self.peer_to_session.insert(peer_id, session_id);
562 self.session_to_peer.insert(session_id, peer_id);
563 Ok(session_id)
564 }
565
566 pub fn unregister_peer(&mut self, peer_id: PeerId) -> Option<SessionId> {
568 let (_, session_id) = self.peer_to_session.remove(&peer_id)?;
569 self.session_to_peer.remove(&session_id);
570 Some(session_id)
571 }
572
573 pub fn clear(&mut self) {
575 self.peer_to_session.clear();
576 self.session_to_peer.clear();
577 self.next_session_id = 1;
578 }
579
580 pub fn session_id_to_i32(session_id: SessionId) -> Option<i32> {
582 i32::try_from(session_id).ok()
583 }
584
585 pub fn session_id_from_i32(session_id: i32) -> Option<SessionId> {
587 u32::try_from(session_id).ok()
588 }
589
590 fn allocate_session_id(&mut self) -> io::Result<SessionId> {
591 let mut candidate = if self.next_session_id == 0 {
592 1
593 } else {
594 self.next_session_id
595 };
596
597 for _ in 0..u32::MAX {
598 if !self.session_to_peer.contains_key(&candidate) {
599 self.next_session_id = candidate.wrapping_add(1);
600 if self.next_session_id == 0 {
601 self.next_session_id = 1;
602 }
603 return Ok(candidate);
604 }
605
606 candidate = candidate.wrapping_add(1);
607 if candidate == 0 {
608 candidate = 1;
609 }
610 }
611
612 Err(io::Error::other("session id space exhausted"))
613 }
614}
615
616pub trait SessionFacadeHandler {
620 fn on_connect<'a>(
621 &'a mut self,
622 _session_id: SessionId,
623 _addr: IpAddr,
624 _port: u16,
625 _client_guid: u64,
626 ) -> ServerHookFuture<'a> {
627 Box::pin(async { Ok(()) })
628 }
629
630 fn on_disconnect<'a>(
631 &'a mut self,
632 _session_id: SessionId,
633 _reason: PeerDisconnectReason,
634 ) -> ServerHookFuture<'a> {
635 Box::pin(async { Ok(()) })
636 }
637
638 fn on_packet<'a>(
639 &'a mut self,
640 _session_id: SessionId,
641 _payload: Bytes,
642 ) -> ServerHookFuture<'a> {
643 Box::pin(async { Ok(()) })
644 }
645
646 fn on_ack<'a>(&'a mut self, _session_id: SessionId, _receipt_id: u64) -> ServerHookFuture<'a> {
647 Box::pin(async { Ok(()) })
648 }
649
650 fn on_metrics<'a>(
651 &'a mut self,
652 _shard_id: usize,
653 _snapshot: TransportMetricsSnapshot,
654 _dropped_non_critical_events: u64,
655 ) -> ServerHookFuture<'a> {
656 Box::pin(async { Ok(()) })
657 }
658}
659
660pub async fn dispatch_session_facade<H: SessionFacadeHandler>(
662 adapter: &mut SessionIdAdapter,
663 handler: &mut H,
664 event: RaknetServerEvent,
665) -> io::Result<()> {
666 match event {
667 RaknetServerEvent::PeerConnected {
668 peer_id,
669 addr,
670 client_guid,
671 ..
672 } => {
673 let session_id = adapter.register_peer(peer_id)?;
674 handler
675 .on_connect(session_id, addr.ip(), addr.port(), client_guid)
676 .await?;
677 }
678 RaknetServerEvent::PeerDisconnected {
679 peer_id, reason, ..
680 } => {
681 if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
682 handler.on_disconnect(session_id, reason).await?;
683 adapter.unregister_peer(peer_id);
684 } else {
685 debug!(
686 peer_id = peer_id.as_u64(),
687 ?reason,
688 "ignoring disconnect for unknown session id mapping"
689 );
690 }
691 }
692 RaknetServerEvent::Packet {
693 peer_id, payload, ..
694 } => {
695 if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
696 handler.on_packet(session_id, payload).await?;
697 } else {
698 debug!(
699 peer_id = peer_id.as_u64(),
700 "dropping packet callback because session id mapping is missing"
701 );
702 }
703 }
704 RaknetServerEvent::ReceiptAcked {
705 peer_id,
706 receipt_id,
707 ..
708 } => {
709 if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
710 handler.on_ack(session_id, receipt_id).await?;
711 } else {
712 debug!(
713 peer_id = peer_id.as_u64(),
714 receipt_id, "dropping ack callback because session id mapping is missing"
715 );
716 }
717 }
718 RaknetServerEvent::Metrics {
719 shard_id,
720 snapshot,
721 dropped_non_critical_events,
722 } => {
723 handler
724 .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
725 .await?;
726 }
727 RaknetServerEvent::OfflinePacket { .. }
728 | RaknetServerEvent::PeerRateLimited { .. }
729 | RaknetServerEvent::SessionLimitReached { .. }
730 | RaknetServerEvent::ProxyDropped { .. }
731 | RaknetServerEvent::DecodeError { .. }
732 | RaknetServerEvent::WorkerError { .. }
733 | RaknetServerEvent::WorkerStopped { .. } => {}
734 }
735
736 Ok(())
737}
738
739pub struct SessionFacade<'a, H: SessionFacadeHandler> {
741 server: &'a mut RaknetServer,
742 handler: &'a mut H,
743 adapter: SessionIdAdapter,
744}
745
746impl<'a, H: SessionFacadeHandler> SessionFacade<'a, H> {
747 pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
749 Self {
750 server,
751 handler,
752 adapter: SessionIdAdapter::new(),
753 }
754 }
755
756 pub fn with_adapter(
758 server: &'a mut RaknetServer,
759 handler: &'a mut H,
760 adapter: SessionIdAdapter,
761 ) -> Self {
762 Self {
763 server,
764 handler,
765 adapter,
766 }
767 }
768
769 pub async fn next(&mut self) -> io::Result<bool> {
773 let Some(event) = self.server.next_event().await else {
774 return Ok(false);
775 };
776 self.dispatch(event).await?;
777 Ok(true)
778 }
779
780 pub async fn run(&mut self) -> io::Result<()> {
782 while self.next().await? {}
783 Ok(())
784 }
785
786 pub fn server(&self) -> &RaknetServer {
788 self.server
789 }
790
791 pub fn server_mut(&mut self) -> &mut RaknetServer {
793 self.server
794 }
795
796 pub fn handler(&self) -> &H {
798 self.handler
799 }
800
801 pub fn handler_mut(&mut self) -> &mut H {
803 self.handler
804 }
805
806 pub fn adapter(&self) -> &SessionIdAdapter {
808 &self.adapter
809 }
810
811 pub fn adapter_mut(&mut self) -> &mut SessionIdAdapter {
813 &mut self.adapter
814 }
815
816 pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
818 self.adapter.session_id_for_peer(peer_id)
819 }
820
821 pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
823 self.adapter.peer_id_for_session(session_id)
824 }
825
826 pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
828 self.adapter.peer_id_for_session_i32(session_id)
829 }
830
831 pub async fn send(
833 &mut self,
834 session_id: SessionId,
835 payload: impl Into<Bytes>,
836 ) -> io::Result<()> {
837 let peer_id = self.resolve_peer_id(session_id)?;
838 self.server.send(peer_id, payload).await
839 }
840
841 pub async fn send_with_options(
843 &mut self,
844 session_id: SessionId,
845 payload: impl Into<Bytes>,
846 options: SendOptions,
847 ) -> io::Result<()> {
848 let peer_id = self.resolve_peer_id(session_id)?;
849 self.server
850 .send_with_options(peer_id, payload, options)
851 .await
852 }
853
854 pub async fn send_with_receipt(
856 &mut self,
857 session_id: SessionId,
858 payload: impl Into<Bytes>,
859 receipt_id: u64,
860 ) -> io::Result<()> {
861 let peer_id = self.resolve_peer_id(session_id)?;
862 self.server
863 .send_with_receipt(peer_id, payload, receipt_id)
864 .await
865 }
866
867 pub async fn disconnect(&mut self, session_id: SessionId) -> io::Result<()> {
869 let peer_id = self.resolve_peer_id(session_id)?;
870 self.server.disconnect(peer_id).await
871 }
872
873 async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
874 dispatch_session_facade(&mut self.adapter, self.handler, event).await
875 }
876
877 fn resolve_peer_id(&self, session_id: SessionId) -> io::Result<PeerId> {
878 self.peer_id_for_session(session_id).ok_or_else(|| {
879 io::Error::new(
880 io::ErrorKind::NotFound,
881 format!("session id {session_id} is not mapped to any peer"),
882 )
883 })
884 }
885}
886
887#[derive(Debug, Clone, Default)]
888pub struct RaknetServerBuilder {
890 transport_config: TransportConfig,
891 runtime_config: ShardedRuntimeConfig,
892}
893
894impl RaknetServerBuilder {
895 pub fn transport_config(mut self, config: TransportConfig) -> Self {
897 self.transport_config = config;
898 self
899 }
900
901 pub fn runtime_config(mut self, config: ShardedRuntimeConfig) -> Self {
903 self.runtime_config = config;
904 self
905 }
906
907 pub fn bind_addr(mut self, bind_addr: SocketAddr) -> Self {
909 self.transport_config.bind_addr = bind_addr;
910 self
911 }
912
913 pub fn shard_count(mut self, shard_count: usize) -> Self {
915 self.runtime_config.shard_count = shard_count.max(1);
916 self
917 }
918
919 pub fn transport_config_mut(&mut self) -> &mut TransportConfig {
921 &mut self.transport_config
922 }
923
924 pub fn runtime_config_mut(&mut self) -> &mut ShardedRuntimeConfig {
926 &mut self.runtime_config
927 }
928
929 pub async fn start(self) -> io::Result<RaknetServer> {
931 self.transport_config
932 .validate()
933 .map_err(invalid_config_io_error)?;
934 self.runtime_config
935 .validate()
936 .map_err(invalid_config_io_error)?;
937 RaknetServer::start_with_configs(self.transport_config, self.runtime_config).await
938 }
939}
940
941#[derive(Debug, Clone, Copy)]
942struct PeerBinding {
943 peer_id: PeerId,
944 shard_id: usize,
945}
946
947pub struct RaknetServer {
948 runtime: ShardedRuntimeHandle,
949 peers_by_addr: FastMap<SocketAddr, PeerBinding>,
950 addrs_by_peer: FastMap<PeerId, SocketAddr>,
951 pending_events: VecDeque<RaknetServerEvent>,
952 next_peer_id: u64,
953}
954
955impl RaknetServer {
956 pub fn builder() -> RaknetServerBuilder {
958 RaknetServerBuilder::default()
959 }
960
961 pub async fn bind(bind_addr: SocketAddr) -> io::Result<Self> {
963 Self::builder().bind_addr(bind_addr).start().await
964 }
965
966 pub fn facade(&mut self) -> ServerFacade<'_> {
968 ServerFacade::new(self)
969 }
970
971 pub fn event_facade<'a, H: EventFacadeHandler>(
973 &'a mut self,
974 handler: &'a mut H,
975 ) -> EventFacade<'a, H> {
976 EventFacade::new(self, handler)
977 }
978
979 pub fn session_facade<'a, H: SessionFacadeHandler>(
981 &'a mut self,
982 handler: &'a mut H,
983 ) -> SessionFacade<'a, H> {
984 SessionFacade::new(self, handler)
985 }
986
987 pub async fn start_with_configs(
989 transport_config: TransportConfig,
990 runtime_config: ShardedRuntimeConfig,
991 ) -> io::Result<Self> {
992 transport_config
993 .validate()
994 .map_err(invalid_config_io_error)?;
995 runtime_config.validate().map_err(invalid_config_io_error)?;
996 let runtime = spawn_sharded_runtime(transport_config, runtime_config).await?;
997 Ok(Self {
998 runtime,
999 peers_by_addr: fast_map(),
1000 addrs_by_peer: fast_map(),
1001 pending_events: VecDeque::new(),
1002 next_peer_id: 1,
1003 })
1004 }
1005
1006 pub fn peer_addr(&self, peer_id: PeerId) -> Option<SocketAddr> {
1008 self.addrs_by_peer.get(&peer_id).map(|addr| *addr)
1009 }
1010
1011 pub fn peer_shard(&self, peer_id: PeerId) -> Option<usize> {
1013 let addr = self.addrs_by_peer.get(&peer_id).map(|addr| *addr)?;
1014 self.peers_by_addr
1015 .get(&addr)
1016 .map(|binding| binding.shard_id)
1017 }
1018
1019 pub fn peer_id_for_addr(&self, addr: SocketAddr) -> Option<PeerId> {
1021 self.peers_by_addr.get(&addr).map(|binding| binding.peer_id)
1022 }
1023
1024 pub async fn send(&self, peer_id: PeerId, payload: impl Into<Bytes>) -> io::Result<()> {
1026 self.send_with_options(peer_id, payload, SendOptions::default())
1027 .await
1028 }
1029
1030 pub async fn send_with_options(
1032 &self,
1033 peer_id: PeerId,
1034 payload: impl Into<Bytes>,
1035 options: SendOptions,
1036 ) -> io::Result<()> {
1037 let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
1038
1039 self.runtime
1040 .send_payload_to_shard(
1041 shard_id,
1042 ShardedSendPayload {
1043 addr,
1044 payload: payload.into(),
1045 reliability: options.reliability,
1046 channel: options.channel,
1047 priority: options.priority,
1048 },
1049 )
1050 .await
1051 }
1052
1053 pub async fn send_with_receipt(
1055 &self,
1056 peer_id: PeerId,
1057 payload: impl Into<Bytes>,
1058 receipt_id: u64,
1059 ) -> io::Result<()> {
1060 self.send_with_options_and_receipt(peer_id, payload, SendOptions::default(), receipt_id)
1061 .await
1062 }
1063
1064 pub async fn send_with_options_and_receipt(
1066 &self,
1067 peer_id: PeerId,
1068 payload: impl Into<Bytes>,
1069 options: SendOptions,
1070 receipt_id: u64,
1071 ) -> io::Result<()> {
1072 let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
1073
1074 self.runtime
1075 .send_payload_to_shard_with_receipt(
1076 shard_id,
1077 ShardedSendPayload {
1078 addr,
1079 payload: payload.into(),
1080 reliability: options.reliability,
1081 channel: options.channel,
1082 priority: options.priority,
1083 },
1084 receipt_id,
1085 )
1086 .await
1087 }
1088
1089 pub async fn disconnect(&mut self, peer_id: PeerId) -> io::Result<()> {
1091 let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
1092 info!(
1093 peer_id = peer_id.as_u64(),
1094 %addr,
1095 shard_id,
1096 "server disconnect requested"
1097 );
1098
1099 self.runtime
1100 .disconnect_peer_from_shard(shard_id, addr)
1101 .await?;
1102 self.remove_peer(addr);
1103 self.pending_events
1104 .push_back(RaknetServerEvent::PeerDisconnected {
1105 peer_id,
1106 addr,
1107 reason: PeerDisconnectReason::Requested,
1108 });
1109 Ok(())
1110 }
1111
1112 pub async fn next_event(&mut self) -> Option<RaknetServerEvent> {
1116 if let Some(event) = self.pending_events.pop_front() {
1117 return Some(event);
1118 }
1119
1120 loop {
1121 let runtime_event = self.runtime.event_rx.recv().await?;
1122 self.enqueue_runtime_event(runtime_event);
1123 if let Some(event) = self.pending_events.pop_front() {
1124 return Some(event);
1125 }
1126 }
1127 }
1128
1129 pub async fn shutdown(self) -> io::Result<()> {
1131 self.runtime.shutdown().await
1132 }
1133
1134 fn enqueue_runtime_event(&mut self, runtime_event: ShardedRuntimeEvent) {
1135 match runtime_event {
1136 ShardedRuntimeEvent::Transport { shard_id, event } => match event {
1137 TransportEvent::PeerDisconnected { addr, reason } => {
1138 if let Some(peer_id) = self.remove_peer(addr) {
1139 let reason = match reason {
1140 RemoteDisconnectReason::DisconnectionNotification { reason_code } => {
1141 PeerDisconnectReason::RemoteDisconnectionNotification {
1142 reason_code,
1143 }
1144 }
1145 RemoteDisconnectReason::DetectLostConnection => {
1146 PeerDisconnectReason::RemoteDetectLostConnection
1147 }
1148 };
1149 info!(
1150 peer_id = peer_id.as_u64(),
1151 %addr,
1152 ?reason,
1153 "peer disconnected"
1154 );
1155 self.pending_events
1156 .push_back(RaknetServerEvent::PeerDisconnected {
1157 peer_id,
1158 addr,
1159 reason,
1160 });
1161 } else {
1162 debug!(
1163 %addr,
1164 ?reason,
1165 "received peer disconnect for unknown address"
1166 );
1167 }
1168 }
1169 TransportEvent::ConnectedFrames {
1170 addr,
1171 client_guid,
1172 frames,
1173 receipts,
1174 ..
1175 } => {
1176 let has_frames = !frames.is_empty();
1177 let has_receipts = !receipts.acked_receipt_ids.is_empty();
1178
1179 if client_guid.is_none() && !has_frames && !has_receipts {
1180 debug!(
1181 %addr,
1182 shard_id,
1183 "ignoring pre-connect transport event without frames/receipts"
1184 );
1185 } else {
1186 let (peer_id, is_new) = self.ensure_peer(addr, shard_id);
1187 if is_new {
1188 let client_guid = client_guid.unwrap_or(peer_id.as_u64());
1189 info!(
1190 peer_id = peer_id.as_u64(),
1191 %addr,
1192 client_guid,
1193 shard_id,
1194 "peer connected"
1195 );
1196 self.pending_events
1197 .push_back(RaknetServerEvent::PeerConnected {
1198 peer_id,
1199 addr,
1200 client_guid,
1201 shard_id,
1202 });
1203 }
1204
1205 for frame in frames {
1206 self.pending_events.push_back(RaknetServerEvent::Packet {
1207 peer_id,
1208 addr,
1209 payload: frame.payload,
1210 reliability: frame.reliability,
1211 reliable_index: frame.reliable_index,
1212 sequence_index: frame.sequence_index,
1213 ordering_index: frame.ordering_index,
1214 ordering_channel: frame.ordering_channel,
1215 });
1216 }
1217
1218 for receipt_id in receipts.acked_receipt_ids {
1219 self.pending_events
1220 .push_back(RaknetServerEvent::ReceiptAcked {
1221 peer_id,
1222 addr,
1223 receipt_id,
1224 });
1225 }
1226 }
1227 }
1228 TransportEvent::RateLimited { addr } => {
1229 warn!(%addr, "peer rate-limited");
1230 self.pending_events
1231 .push_back(RaknetServerEvent::PeerRateLimited { addr });
1232 }
1233 TransportEvent::SessionLimitReached { addr } => {
1234 warn!(%addr, "session limit reached");
1235 self.pending_events
1236 .push_back(RaknetServerEvent::SessionLimitReached { addr });
1237 }
1238 TransportEvent::ConnectedDatagramDroppedNoSession { .. } => {}
1239 TransportEvent::ProxyDropped { addr } => {
1240 debug!(%addr, "proxy router dropped packet");
1241 self.pending_events
1242 .push_back(RaknetServerEvent::ProxyDropped { addr });
1243 }
1244 TransportEvent::DecodeError { addr, error } => {
1245 warn!(%addr, %error, "transport decode error");
1246 self.pending_events
1247 .push_back(RaknetServerEvent::DecodeError {
1248 addr,
1249 error: error.to_string(),
1250 });
1251 }
1252 TransportEvent::OfflinePacket { addr, packet } => {
1253 self.pending_events
1254 .push_back(RaknetServerEvent::OfflinePacket { addr, packet });
1255 }
1256 },
1257 ShardedRuntimeEvent::Metrics {
1258 shard_id,
1259 snapshot,
1260 dropped_non_critical_events,
1261 } => {
1262 if dropped_non_critical_events > 0 {
1263 debug!(
1264 shard_id,
1265 dropped_non_critical_events,
1266 "non-critical runtime events were dropped before metrics emit"
1267 );
1268 }
1269 self.pending_events.push_back(RaknetServerEvent::Metrics {
1270 shard_id,
1271 snapshot,
1272 dropped_non_critical_events,
1273 });
1274 }
1275 ShardedRuntimeEvent::WorkerError { shard_id, message } => {
1276 warn!(shard_id, %message, "runtime worker error");
1277 self.pending_events
1278 .push_back(RaknetServerEvent::WorkerError { shard_id, message });
1279 }
1280 ShardedRuntimeEvent::WorkerStopped { shard_id } => {
1281 warn!(shard_id, "runtime worker stopped");
1282 let mut disconnected = Vec::new();
1283 for peer in self.peers_by_addr.iter() {
1284 let addr = *peer.key();
1285 let binding = *peer.value();
1286 if binding.shard_id == shard_id {
1287 disconnected.push((addr, binding.peer_id));
1288 }
1289 }
1290 for (addr, peer_id) in disconnected {
1291 self.remove_peer(addr);
1292 info!(
1293 peer_id = peer_id.as_u64(),
1294 %addr,
1295 shard_id,
1296 "peer disconnected because worker stopped"
1297 );
1298 self.pending_events
1299 .push_back(RaknetServerEvent::PeerDisconnected {
1300 peer_id,
1301 addr,
1302 reason: PeerDisconnectReason::WorkerStopped { shard_id },
1303 });
1304 }
1305 self.pending_events
1306 .push_back(RaknetServerEvent::WorkerStopped { shard_id });
1307 }
1308 }
1309 }
1310
1311 fn ensure_peer(&mut self, addr: SocketAddr, shard_id: usize) -> (PeerId, bool) {
1312 if let Some(mut binding) = self.peers_by_addr.get_mut(&addr) {
1313 if binding.shard_id != shard_id {
1314 binding.shard_id = shard_id;
1315 }
1316 return (binding.peer_id, false);
1317 }
1318
1319 let peer_id = PeerId(self.next_peer_id);
1320 self.next_peer_id = self.next_peer_id.saturating_add(1);
1321 self.peers_by_addr
1322 .insert(addr, PeerBinding { peer_id, shard_id });
1323 self.addrs_by_peer.insert(peer_id, addr);
1324 (peer_id, true)
1325 }
1326
1327 fn remove_peer(&mut self, addr: SocketAddr) -> Option<PeerId> {
1328 let (_, binding) = self.peers_by_addr.remove(&addr)?;
1329 self.addrs_by_peer.remove(&binding.peer_id);
1330 Some(binding.peer_id)
1331 }
1332
1333 fn resolve_peer_route(&self, peer_id: PeerId) -> io::Result<(SocketAddr, usize)> {
1334 let addr = self
1335 .addrs_by_peer
1336 .get(&peer_id)
1337 .map(|entry| *entry)
1338 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer id not found"))?;
1339 let shard_id = self
1340 .peers_by_addr
1341 .get(&addr)
1342 .map(|binding| binding.shard_id)
1343 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer shard binding missing"))?;
1344 Ok((addr, shard_id))
1345 }
1346}
1347
1348fn invalid_config_io_error(error: ConfigValidationError) -> io::Error {
1349 io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354 use super::{
1355 EventFacadeHandler, PeerDisconnectReason, PeerId, RaknetServer, RaknetServerBuilder,
1356 RaknetServerEvent, ServerHookFuture, SessionFacadeHandler, SessionId, SessionIdAdapter,
1357 dispatch_event_facade, dispatch_session_facade,
1358 };
1359 use crate::protocol::reliability::Reliability;
1360 use crate::transport::{ShardedRuntimeConfig, TransportConfig, TransportMetricsSnapshot};
1361 use bytes::Bytes;
1362 use std::io;
1363 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1364
1365 #[test]
1366 fn builder_mutators_keep_values() {
1367 let builder = RaknetServerBuilder::default().shard_count(4);
1368 assert_eq!(builder.runtime_config.shard_count, 4);
1369 }
1370
1371 #[test]
1372 fn peer_id_roundtrip() {
1373 let peer = PeerId::from_u64(42);
1374 assert_eq!(peer.as_u64(), 42);
1375 }
1376
1377 #[test]
1378 fn builder_type_is_exposed() {
1379 let _ = RaknetServer::builder();
1380 }
1381
1382 #[tokio::test]
1383 async fn start_with_invalid_runtime_config_fails_fast() {
1384 let transport = TransportConfig {
1385 bind_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1386 ..TransportConfig::default()
1387 };
1388 let runtime = ShardedRuntimeConfig {
1389 shard_count: 0,
1390 ..ShardedRuntimeConfig::default()
1391 };
1392
1393 match RaknetServer::start_with_configs(transport, runtime).await {
1394 Ok(_) => panic!("invalid config must fail before runtime start"),
1395 Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
1396 }
1397 }
1398
1399 #[derive(Default)]
1400 struct CountingEventHandler {
1401 connect_calls: usize,
1402 disconnect_calls: usize,
1403 packet_calls: usize,
1404 ack_calls: usize,
1405 metrics_calls: usize,
1406 last_connect: Option<(u64, IpAddr, u16, u64)>,
1407 last_disconnect: Option<(u64, PeerDisconnectReason)>,
1408 last_packet: Option<(u64, Bytes)>,
1409 last_ack: Option<(u64, u64)>,
1410 last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1411 }
1412
1413 impl EventFacadeHandler for CountingEventHandler {
1414 fn on_connect<'a>(
1415 &'a mut self,
1416 session_id: u64,
1417 addr: IpAddr,
1418 port: u16,
1419 client_guid: u64,
1420 ) -> ServerHookFuture<'a> {
1421 self.connect_calls = self.connect_calls.saturating_add(1);
1422 self.last_connect = Some((session_id, addr, port, client_guid));
1423 Box::pin(async { Ok(()) })
1424 }
1425
1426 fn on_disconnect<'a>(
1427 &'a mut self,
1428 session_id: u64,
1429 reason: PeerDisconnectReason,
1430 ) -> ServerHookFuture<'a> {
1431 self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1432 self.last_disconnect = Some((session_id, reason));
1433 Box::pin(async { Ok(()) })
1434 }
1435
1436 fn on_packet<'a>(&'a mut self, session_id: u64, payload: Bytes) -> ServerHookFuture<'a> {
1437 self.packet_calls = self.packet_calls.saturating_add(1);
1438 self.last_packet = Some((session_id, payload));
1439 Box::pin(async { Ok(()) })
1440 }
1441
1442 fn on_ack<'a>(&'a mut self, session_id: u64, receipt_id: u64) -> ServerHookFuture<'a> {
1443 self.ack_calls = self.ack_calls.saturating_add(1);
1444 self.last_ack = Some((session_id, receipt_id));
1445 Box::pin(async { Ok(()) })
1446 }
1447
1448 fn on_metrics<'a>(
1449 &'a mut self,
1450 shard_id: usize,
1451 snapshot: TransportMetricsSnapshot,
1452 dropped_non_critical_events: u64,
1453 ) -> ServerHookFuture<'a> {
1454 self.metrics_calls = self.metrics_calls.saturating_add(1);
1455 self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1456 Box::pin(async { Ok(()) })
1457 }
1458 }
1459
1460 #[tokio::test]
1461 async fn dispatch_event_facade_maps_callbacks() -> io::Result<()> {
1462 let mut handler = CountingEventHandler::default();
1463 let peer_id = PeerId::from_u64(77);
1464 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)), 19132);
1465 let payload = Bytes::from_static(b"\x01\x02event");
1466 let metrics = TransportMetricsSnapshot {
1467 packets_forwarded_total: 33,
1468 bytes_forwarded_total: 1200,
1469 ..TransportMetricsSnapshot::default()
1470 };
1471
1472 dispatch_event_facade(
1473 &mut handler,
1474 RaknetServerEvent::PeerConnected {
1475 peer_id,
1476 addr,
1477 client_guid: 0xAABB_CCDD_EEFF_0011,
1478 shard_id: 2,
1479 },
1480 )
1481 .await?;
1482
1483 dispatch_event_facade(
1484 &mut handler,
1485 RaknetServerEvent::Packet {
1486 peer_id,
1487 addr,
1488 payload: payload.clone(),
1489 reliability: Reliability::ReliableOrdered,
1490 reliable_index: None,
1491 sequence_index: None,
1492 ordering_index: None,
1493 ordering_channel: None,
1494 },
1495 )
1496 .await?;
1497
1498 dispatch_event_facade(
1499 &mut handler,
1500 RaknetServerEvent::ReceiptAcked {
1501 peer_id,
1502 addr,
1503 receipt_id: 9001,
1504 },
1505 )
1506 .await?;
1507
1508 dispatch_event_facade(
1509 &mut handler,
1510 RaknetServerEvent::Metrics {
1511 shard_id: 2,
1512 snapshot: Box::new(metrics),
1513 dropped_non_critical_events: 5,
1514 },
1515 )
1516 .await?;
1517
1518 dispatch_event_facade(
1519 &mut handler,
1520 RaknetServerEvent::PeerDisconnected {
1521 peer_id,
1522 addr,
1523 reason: PeerDisconnectReason::Requested,
1524 },
1525 )
1526 .await?;
1527
1528 dispatch_event_facade(
1529 &mut handler,
1530 RaknetServerEvent::WorkerStopped { shard_id: 0 },
1531 )
1532 .await?;
1533
1534 assert_eq!(handler.connect_calls, 1);
1535 assert_eq!(handler.packet_calls, 1);
1536 assert_eq!(handler.ack_calls, 1);
1537 assert_eq!(handler.metrics_calls, 1);
1538 assert_eq!(handler.disconnect_calls, 1);
1539 assert_eq!(
1540 handler.last_connect,
1541 Some((77, addr.ip(), addr.port(), 0xAABB_CCDD_EEFF_0011))
1542 );
1543 assert_eq!(handler.last_packet, Some((77, payload)));
1544 assert_eq!(handler.last_ack, Some((77, 9001)));
1545 let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1546 .last_metrics
1547 .expect("metrics callback should store last snapshot");
1548 assert_eq!(metrics_shard, 2);
1549 assert_eq!(metrics_dropped, 5);
1550 assert_eq!(metrics_snapshot.packets_forwarded_total, 33);
1551 assert_eq!(metrics_snapshot.bytes_forwarded_total, 1200);
1552 assert_eq!(
1553 handler.last_disconnect,
1554 Some((77, PeerDisconnectReason::Requested))
1555 );
1556
1557 Ok(())
1558 }
1559
1560 #[derive(Default)]
1561 struct CountingSessionHandler {
1562 connect_calls: usize,
1563 disconnect_calls: usize,
1564 packet_calls: usize,
1565 ack_calls: usize,
1566 metrics_calls: usize,
1567 last_connect: Option<(SessionId, IpAddr, u16, u64)>,
1568 last_disconnect: Option<(SessionId, PeerDisconnectReason)>,
1569 last_packet: Option<(SessionId, Bytes)>,
1570 last_ack: Option<(SessionId, u64)>,
1571 last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1572 }
1573
1574 impl SessionFacadeHandler for CountingSessionHandler {
1575 fn on_connect<'a>(
1576 &'a mut self,
1577 session_id: SessionId,
1578 addr: IpAddr,
1579 port: u16,
1580 client_guid: u64,
1581 ) -> ServerHookFuture<'a> {
1582 self.connect_calls = self.connect_calls.saturating_add(1);
1583 self.last_connect = Some((session_id, addr, port, client_guid));
1584 Box::pin(async { Ok(()) })
1585 }
1586
1587 fn on_disconnect<'a>(
1588 &'a mut self,
1589 session_id: SessionId,
1590 reason: PeerDisconnectReason,
1591 ) -> ServerHookFuture<'a> {
1592 self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1593 self.last_disconnect = Some((session_id, reason));
1594 Box::pin(async { Ok(()) })
1595 }
1596
1597 fn on_packet<'a>(
1598 &'a mut self,
1599 session_id: SessionId,
1600 payload: Bytes,
1601 ) -> ServerHookFuture<'a> {
1602 self.packet_calls = self.packet_calls.saturating_add(1);
1603 self.last_packet = Some((session_id, payload));
1604 Box::pin(async { Ok(()) })
1605 }
1606
1607 fn on_ack<'a>(
1608 &'a mut self,
1609 session_id: SessionId,
1610 receipt_id: u64,
1611 ) -> ServerHookFuture<'a> {
1612 self.ack_calls = self.ack_calls.saturating_add(1);
1613 self.last_ack = Some((session_id, receipt_id));
1614 Box::pin(async { Ok(()) })
1615 }
1616
1617 fn on_metrics<'a>(
1618 &'a mut self,
1619 shard_id: usize,
1620 snapshot: TransportMetricsSnapshot,
1621 dropped_non_critical_events: u64,
1622 ) -> ServerHookFuture<'a> {
1623 self.metrics_calls = self.metrics_calls.saturating_add(1);
1624 self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1625 Box::pin(async { Ok(()) })
1626 }
1627 }
1628
1629 #[test]
1630 fn session_id_adapter_bridges_peer_and_signed_ids() {
1631 let mut adapter = SessionIdAdapter::new();
1632 let peer_a = PeerId::from_u64(0x1_0000_0001);
1633 let peer_b = PeerId::from_u64(0x2_0000_0002);
1634
1635 let session_a = adapter
1636 .register_peer(peer_a)
1637 .expect("first session id allocation should succeed");
1638 let session_b = adapter
1639 .register_peer(peer_b)
1640 .expect("second session id allocation should succeed");
1641
1642 assert_eq!(session_a, 1);
1643 assert_eq!(session_b, 2);
1644 assert_eq!(adapter.session_id_for_peer(peer_a), Some(session_a));
1645 assert_eq!(adapter.peer_id_for_session(session_b), Some(peer_b));
1646 assert_eq!(adapter.peer_id_for_session_i32(2), Some(peer_b));
1647 assert_eq!(adapter.peer_id_for_session_i32(-1), None);
1648 assert_eq!(SessionIdAdapter::session_id_to_i32(session_a), Some(1));
1649 assert_eq!(SessionIdAdapter::session_id_from_i32(2), Some(2));
1650 assert_eq!(SessionIdAdapter::session_id_from_i32(-5), None);
1651
1652 assert_eq!(adapter.unregister_peer(peer_a), Some(session_a));
1653 assert_eq!(adapter.session_id_for_peer(peer_a), None);
1654 assert_eq!(adapter.peer_id_for_session(session_a), None);
1655 }
1656
1657 #[tokio::test]
1658 async fn dispatch_session_facade_maps_callbacks_and_releases_mapping() -> io::Result<()> {
1659 let mut adapter = SessionIdAdapter::new();
1660 let mut handler = CountingSessionHandler::default();
1661 let peer_id = PeerId::from_u64(0xDEAD_BEEF_F00D);
1662 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 1, 2, 3)), 19133);
1663 let payload = Bytes::from_static(b"\x10\x20session");
1664 let metrics = TransportMetricsSnapshot {
1665 packets_forwarded_total: 5,
1666 bytes_forwarded_total: 80,
1667 ..TransportMetricsSnapshot::default()
1668 };
1669
1670 dispatch_session_facade(
1671 &mut adapter,
1672 &mut handler,
1673 RaknetServerEvent::PeerConnected {
1674 peer_id,
1675 addr,
1676 client_guid: 0xABCD_EF01_0203_0405,
1677 shard_id: 0,
1678 },
1679 )
1680 .await?;
1681
1682 let session_id = adapter
1683 .session_id_for_peer(peer_id)
1684 .expect("session id should be registered after connect");
1685 assert_eq!(session_id, 1);
1686
1687 dispatch_session_facade(
1688 &mut adapter,
1689 &mut handler,
1690 RaknetServerEvent::Packet {
1691 peer_id,
1692 addr,
1693 payload: payload.clone(),
1694 reliability: Reliability::ReliableOrdered,
1695 reliable_index: None,
1696 sequence_index: None,
1697 ordering_index: None,
1698 ordering_channel: None,
1699 },
1700 )
1701 .await?;
1702
1703 dispatch_session_facade(
1704 &mut adapter,
1705 &mut handler,
1706 RaknetServerEvent::ReceiptAcked {
1707 peer_id,
1708 addr,
1709 receipt_id: 44,
1710 },
1711 )
1712 .await?;
1713
1714 dispatch_session_facade(
1715 &mut adapter,
1716 &mut handler,
1717 RaknetServerEvent::Metrics {
1718 shard_id: 0,
1719 snapshot: Box::new(metrics),
1720 dropped_non_critical_events: 7,
1721 },
1722 )
1723 .await?;
1724
1725 dispatch_session_facade(
1726 &mut adapter,
1727 &mut handler,
1728 RaknetServerEvent::PeerDisconnected {
1729 peer_id,
1730 addr,
1731 reason: PeerDisconnectReason::Requested,
1732 },
1733 )
1734 .await?;
1735
1736 assert_eq!(handler.connect_calls, 1);
1737 assert_eq!(handler.packet_calls, 1);
1738 assert_eq!(handler.ack_calls, 1);
1739 assert_eq!(handler.metrics_calls, 1);
1740 assert_eq!(handler.disconnect_calls, 1);
1741 assert_eq!(
1742 handler.last_connect,
1743 Some((1, addr.ip(), addr.port(), 0xABCD_EF01_0203_0405))
1744 );
1745 assert_eq!(handler.last_packet, Some((1, payload)));
1746 assert_eq!(handler.last_ack, Some((1, 44)));
1747 assert_eq!(
1748 handler.last_disconnect,
1749 Some((1, PeerDisconnectReason::Requested))
1750 );
1751 assert_eq!(adapter.session_id_for_peer(peer_id), None);
1752 assert_eq!(adapter.peer_id_for_session(1), None);
1753
1754 let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1755 .last_metrics
1756 .expect("metrics callback should store last snapshot");
1757 assert_eq!(metrics_shard, 0);
1758 assert_eq!(metrics_dropped, 7);
1759 assert_eq!(metrics_snapshot.packets_forwarded_total, 5);
1760 assert_eq!(metrics_snapshot.bytes_forwarded_total, 80);
1761
1762 Ok(())
1763 }
1764}