Skip to main content

raknet_rust/
server.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::io;
4use std::net::{IpAddr, SocketAddr};
5use std::pin::Pin;
6
7use bytes::Bytes;
8use tracing::{debug, info, warn};
9
10use crate::concurrency::{FastMap, fast_map};
11use crate::error::ConfigValidationError;
12use crate::handshake::OfflinePacket;
13use crate::protocol::reliability::Reliability;
14use crate::protocol::sequence24::Sequence24;
15use crate::session::RakPriority;
16use crate::transport::{
17    RemoteDisconnectReason, ShardedRuntimeConfig, ShardedRuntimeEvent, ShardedRuntimeHandle,
18    ShardedSendPayload, TransportConfig, TransportEvent, TransportMetricsSnapshot,
19    spawn_sharded_runtime,
20};
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub struct PeerId(u64);
24
25impl PeerId {
26    pub const fn from_u64(value: u64) -> Self {
27        Self(value)
28    }
29
30    pub const fn as_u64(self) -> u64 {
31        self.0
32    }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct SendOptions {
37    pub reliability: Reliability,
38    pub channel: u8,
39    pub priority: RakPriority,
40}
41
42impl Default for SendOptions {
43    fn default() -> Self {
44        Self {
45            reliability: Reliability::ReliableOrdered,
46            channel: 0,
47            priority: RakPriority::High,
48        }
49    }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum PeerDisconnectReason {
54    Requested,
55    RemoteDisconnectionNotification { reason_code: Option<u8> },
56    RemoteDetectLostConnection,
57    WorkerStopped { shard_id: usize },
58}
59
60#[derive(Debug)]
61pub enum RaknetServerEvent {
62    PeerConnected {
63        peer_id: PeerId,
64        addr: SocketAddr,
65        client_guid: u64,
66        shard_id: usize,
67    },
68    PeerDisconnected {
69        peer_id: PeerId,
70        addr: SocketAddr,
71        reason: PeerDisconnectReason,
72    },
73    Packet {
74        peer_id: PeerId,
75        addr: SocketAddr,
76        payload: Bytes,
77        reliability: Reliability,
78        reliable_index: Option<Sequence24>,
79        sequence_index: Option<Sequence24>,
80        ordering_index: Option<Sequence24>,
81        ordering_channel: Option<u8>,
82    },
83    OfflinePacket {
84        addr: SocketAddr,
85        packet: OfflinePacket,
86    },
87    ReceiptAcked {
88        peer_id: PeerId,
89        addr: SocketAddr,
90        receipt_id: u64,
91    },
92    PeerRateLimited {
93        addr: SocketAddr,
94    },
95    SessionLimitReached {
96        addr: SocketAddr,
97    },
98    ProxyDropped {
99        addr: SocketAddr,
100    },
101    DecodeError {
102        addr: SocketAddr,
103        error: String,
104    },
105    WorkerError {
106        shard_id: usize,
107        message: String,
108    },
109    WorkerStopped {
110        shard_id: usize,
111    },
112    Metrics {
113        shard_id: usize,
114        snapshot: Box<TransportMetricsSnapshot>,
115        dropped_non_critical_events: u64,
116    },
117}
118
119impl RaknetServerEvent {
120    pub fn metrics_snapshot(&self) -> Option<(usize, &TransportMetricsSnapshot, u64)> {
121        match self {
122            Self::Metrics {
123                shard_id,
124                snapshot,
125                dropped_non_critical_events,
126            } => Some((*shard_id, snapshot.as_ref(), *dropped_non_critical_events)),
127            _ => None,
128        }
129    }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub struct ConnectEvent {
134    pub peer_id: PeerId,
135    pub addr: SocketAddr,
136    pub client_guid: u64,
137    pub shard_id: usize,
138}
139
140#[derive(Debug, Clone)]
141pub struct PacketEvent {
142    pub peer_id: PeerId,
143    pub addr: SocketAddr,
144    pub payload: Bytes,
145    pub reliability: Reliability,
146    pub reliable_index: Option<Sequence24>,
147    pub sequence_index: Option<Sequence24>,
148    pub ordering_index: Option<Sequence24>,
149    pub ordering_channel: Option<u8>,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub struct DisconnectEvent {
154    pub peer_id: PeerId,
155    pub addr: SocketAddr,
156    pub reason: PeerDisconnectReason,
157}
158
159pub type ServerHookFuture<'a> = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
160
161type ConnectHandler =
162    Box<dyn for<'a> FnMut(&'a mut RaknetServer, ConnectEvent) -> ServerHookFuture<'a> + Send>;
163type PacketHandler =
164    Box<dyn for<'a> FnMut(&'a mut RaknetServer, PacketEvent) -> ServerHookFuture<'a> + Send>;
165type DisconnectHandler =
166    Box<dyn for<'a> FnMut(&'a mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'a> + Send>;
167
168pub struct ServerFacade<'a> {
169    server: &'a mut RaknetServer,
170    on_connect: Option<ConnectHandler>,
171    on_packet: Option<PacketHandler>,
172    on_disconnect: Option<DisconnectHandler>,
173}
174
175impl<'a> ServerFacade<'a> {
176    pub fn new(server: &'a mut RaknetServer) -> Self {
177        Self {
178            server,
179            on_connect: None,
180            on_packet: None,
181            on_disconnect: None,
182        }
183    }
184
185    pub fn on_connect<F>(mut self, handler: F) -> Self
186    where
187        F: for<'b> FnMut(&'b mut RaknetServer, ConnectEvent) -> ServerHookFuture<'b>
188            + Send
189            + 'static,
190    {
191        self.on_connect = Some(Box::new(handler));
192        self
193    }
194
195    pub fn on_packet<F>(mut self, handler: F) -> Self
196    where
197        F: for<'b> FnMut(&'b mut RaknetServer, PacketEvent) -> ServerHookFuture<'b>
198            + Send
199            + 'static,
200    {
201        self.on_packet = Some(Box::new(handler));
202        self
203    }
204
205    pub fn on_disconnect<F>(mut self, handler: F) -> Self
206    where
207        F: for<'b> FnMut(&'b mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'b>
208            + Send
209            + 'static,
210    {
211        self.on_disconnect = Some(Box::new(handler));
212        self
213    }
214
215    pub async fn next(&mut self) -> io::Result<bool> {
216        let Some(event) = self.server.next_event().await else {
217            return Ok(false);
218        };
219        self.dispatch(event).await?;
220        Ok(true)
221    }
222
223    pub async fn run(&mut self) -> io::Result<()> {
224        while self.next().await? {}
225        Ok(())
226    }
227
228    pub fn server(&self) -> &RaknetServer {
229        self.server
230    }
231
232    pub fn server_mut(&mut self) -> &mut RaknetServer {
233        self.server
234    }
235
236    async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
237        match event {
238            RaknetServerEvent::PeerConnected {
239                peer_id,
240                addr,
241                client_guid,
242                shard_id,
243            } => {
244                if let Some(handler) = self.on_connect.as_mut() {
245                    handler(
246                        self.server,
247                        ConnectEvent {
248                            peer_id,
249                            addr,
250                            client_guid,
251                            shard_id,
252                        },
253                    )
254                    .await?;
255                }
256            }
257            RaknetServerEvent::Packet {
258                peer_id,
259                addr,
260                payload,
261                reliability,
262                reliable_index,
263                sequence_index,
264                ordering_index,
265                ordering_channel,
266            } => {
267                if let Some(handler) = self.on_packet.as_mut() {
268                    handler(
269                        self.server,
270                        PacketEvent {
271                            peer_id,
272                            addr,
273                            payload,
274                            reliability,
275                            reliable_index,
276                            sequence_index,
277                            ordering_index,
278                            ordering_channel,
279                        },
280                    )
281                    .await?;
282                }
283            }
284            RaknetServerEvent::PeerDisconnected {
285                peer_id,
286                addr,
287                reason,
288            } => {
289                if let Some(handler) = self.on_disconnect.as_mut() {
290                    handler(
291                        self.server,
292                        DisconnectEvent {
293                            peer_id,
294                            addr,
295                            reason,
296                        },
297                    )
298                    .await?;
299                }
300            }
301            RaknetServerEvent::OfflinePacket { .. }
302            | RaknetServerEvent::ReceiptAcked { .. }
303            | RaknetServerEvent::PeerRateLimited { .. }
304            | RaknetServerEvent::SessionLimitReached { .. }
305            | RaknetServerEvent::ProxyDropped { .. }
306            | RaknetServerEvent::DecodeError { .. }
307            | RaknetServerEvent::WorkerError { .. }
308            | RaknetServerEvent::WorkerStopped { .. }
309            | RaknetServerEvent::Metrics { .. } => {}
310        }
311
312        Ok(())
313    }
314}
315
316/// Event-driven callback surface that mirrors RakLibInterface-style hooks.
317///
318/// This trait is intentionally event-driven and is fed from `RaknetServerEvent`.
319/// All methods are optional and default to no-op.
320pub trait EventFacadeHandler {
321    fn on_connect<'a>(
322        &'a mut self,
323        _session_id: u64,
324        _addr: IpAddr,
325        _port: u16,
326        _client_guid: u64,
327    ) -> ServerHookFuture<'a> {
328        Box::pin(async { Ok(()) })
329    }
330
331    fn on_disconnect<'a>(
332        &'a mut self,
333        _session_id: u64,
334        _reason: PeerDisconnectReason,
335    ) -> ServerHookFuture<'a> {
336        Box::pin(async { Ok(()) })
337    }
338
339    fn on_packet<'a>(&'a mut self, _session_id: u64, _payload: Bytes) -> ServerHookFuture<'a> {
340        Box::pin(async { Ok(()) })
341    }
342
343    fn on_ack<'a>(&'a mut self, _session_id: u64, _receipt_id: u64) -> ServerHookFuture<'a> {
344        Box::pin(async { Ok(()) })
345    }
346
347    fn on_metrics<'a>(
348        &'a mut self,
349        _shard_id: usize,
350        _snapshot: TransportMetricsSnapshot,
351        _dropped_non_critical_events: u64,
352    ) -> ServerHookFuture<'a> {
353        Box::pin(async { Ok(()) })
354    }
355}
356
357pub async fn dispatch_event_facade<H: EventFacadeHandler>(
358    handler: &mut H,
359    event: RaknetServerEvent,
360) -> io::Result<()> {
361    match event {
362        RaknetServerEvent::PeerConnected {
363            peer_id,
364            addr,
365            client_guid,
366            ..
367        } => {
368            handler
369                .on_connect(peer_id.as_u64(), addr.ip(), addr.port(), client_guid)
370                .await?;
371        }
372        RaknetServerEvent::PeerDisconnected {
373            peer_id, reason, ..
374        } => {
375            handler.on_disconnect(peer_id.as_u64(), reason).await?;
376        }
377        RaknetServerEvent::Packet {
378            peer_id, payload, ..
379        } => {
380            handler.on_packet(peer_id.as_u64(), payload).await?;
381        }
382        RaknetServerEvent::ReceiptAcked {
383            peer_id,
384            receipt_id,
385            ..
386        } => {
387            handler.on_ack(peer_id.as_u64(), receipt_id).await?;
388        }
389        RaknetServerEvent::Metrics {
390            shard_id,
391            snapshot,
392            dropped_non_critical_events,
393        } => {
394            handler
395                .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
396                .await?;
397        }
398        RaknetServerEvent::OfflinePacket { .. }
399        | RaknetServerEvent::PeerRateLimited { .. }
400        | RaknetServerEvent::SessionLimitReached { .. }
401        | RaknetServerEvent::ProxyDropped { .. }
402        | RaknetServerEvent::DecodeError { .. }
403        | RaknetServerEvent::WorkerError { .. }
404        | RaknetServerEvent::WorkerStopped { .. } => {}
405    }
406
407    Ok(())
408}
409
410pub struct EventFacade<'a, H: EventFacadeHandler> {
411    server: &'a mut RaknetServer,
412    handler: &'a mut H,
413}
414
415impl<'a, H: EventFacadeHandler> EventFacade<'a, H> {
416    pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
417        Self { server, handler }
418    }
419
420    pub async fn next(&mut self) -> io::Result<bool> {
421        let Some(event) = self.server.next_event().await else {
422            return Ok(false);
423        };
424        self.dispatch(event).await?;
425        Ok(true)
426    }
427
428    pub async fn run(&mut self) -> io::Result<()> {
429        while self.next().await? {}
430        Ok(())
431    }
432
433    pub fn server(&self) -> &RaknetServer {
434        self.server
435    }
436
437    pub fn server_mut(&mut self) -> &mut RaknetServer {
438        self.server
439    }
440
441    pub fn handler(&self) -> &H {
442        self.handler
443    }
444
445    pub fn handler_mut(&mut self) -> &mut H {
446        self.handler
447    }
448
449    async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
450        dispatch_event_facade(self.handler, event).await
451    }
452}
453
454pub type SessionId = u32;
455
456#[derive(Debug)]
457pub struct SessionIdAdapter {
458    peer_to_session: FastMap<PeerId, SessionId>,
459    session_to_peer: FastMap<SessionId, PeerId>,
460    next_session_id: SessionId,
461}
462
463impl Default for SessionIdAdapter {
464    fn default() -> Self {
465        Self::new()
466    }
467}
468
469impl SessionIdAdapter {
470    pub fn new() -> Self {
471        Self {
472            peer_to_session: fast_map(),
473            session_to_peer: fast_map(),
474            next_session_id: 1,
475        }
476    }
477
478    pub fn len(&self) -> usize {
479        self.peer_to_session.len()
480    }
481
482    pub fn is_empty(&self) -> bool {
483        self.peer_to_session.is_empty()
484    }
485
486    pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
487        self.peer_to_session.get(&peer_id).map(|entry| *entry)
488    }
489
490    pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
491        self.session_to_peer.get(&session_id).map(|entry| *entry)
492    }
493
494    pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
495        let session_id = Self::session_id_from_i32(session_id)?;
496        self.peer_id_for_session(session_id)
497    }
498
499    pub fn register_peer(&mut self, peer_id: PeerId) -> io::Result<SessionId> {
500        if let Some(existing) = self.session_id_for_peer(peer_id) {
501            return Ok(existing);
502        }
503
504        let session_id = self.allocate_session_id()?;
505        self.peer_to_session.insert(peer_id, session_id);
506        self.session_to_peer.insert(session_id, peer_id);
507        Ok(session_id)
508    }
509
510    pub fn unregister_peer(&mut self, peer_id: PeerId) -> Option<SessionId> {
511        let (_, session_id) = self.peer_to_session.remove(&peer_id)?;
512        self.session_to_peer.remove(&session_id);
513        Some(session_id)
514    }
515
516    pub fn clear(&mut self) {
517        self.peer_to_session.clear();
518        self.session_to_peer.clear();
519        self.next_session_id = 1;
520    }
521
522    pub fn session_id_to_i32(session_id: SessionId) -> Option<i32> {
523        i32::try_from(session_id).ok()
524    }
525
526    pub fn session_id_from_i32(session_id: i32) -> Option<SessionId> {
527        u32::try_from(session_id).ok()
528    }
529
530    fn allocate_session_id(&mut self) -> io::Result<SessionId> {
531        let mut candidate = if self.next_session_id == 0 {
532            1
533        } else {
534            self.next_session_id
535        };
536
537        for _ in 0..u32::MAX {
538            if !self.session_to_peer.contains_key(&candidate) {
539                self.next_session_id = candidate.wrapping_add(1);
540                if self.next_session_id == 0 {
541                    self.next_session_id = 1;
542                }
543                return Ok(candidate);
544            }
545
546            candidate = candidate.wrapping_add(1);
547            if candidate == 0 {
548                candidate = 1;
549            }
550        }
551
552        Err(io::Error::other("session id space exhausted"))
553    }
554}
555
556pub trait SessionFacadeHandler {
557    fn on_connect<'a>(
558        &'a mut self,
559        _session_id: SessionId,
560        _addr: IpAddr,
561        _port: u16,
562        _client_guid: u64,
563    ) -> ServerHookFuture<'a> {
564        Box::pin(async { Ok(()) })
565    }
566
567    fn on_disconnect<'a>(
568        &'a mut self,
569        _session_id: SessionId,
570        _reason: PeerDisconnectReason,
571    ) -> ServerHookFuture<'a> {
572        Box::pin(async { Ok(()) })
573    }
574
575    fn on_packet<'a>(
576        &'a mut self,
577        _session_id: SessionId,
578        _payload: Bytes,
579    ) -> ServerHookFuture<'a> {
580        Box::pin(async { Ok(()) })
581    }
582
583    fn on_ack<'a>(&'a mut self, _session_id: SessionId, _receipt_id: u64) -> ServerHookFuture<'a> {
584        Box::pin(async { Ok(()) })
585    }
586
587    fn on_metrics<'a>(
588        &'a mut self,
589        _shard_id: usize,
590        _snapshot: TransportMetricsSnapshot,
591        _dropped_non_critical_events: u64,
592    ) -> ServerHookFuture<'a> {
593        Box::pin(async { Ok(()) })
594    }
595}
596
597pub async fn dispatch_session_facade<H: SessionFacadeHandler>(
598    adapter: &mut SessionIdAdapter,
599    handler: &mut H,
600    event: RaknetServerEvent,
601) -> io::Result<()> {
602    match event {
603        RaknetServerEvent::PeerConnected {
604            peer_id,
605            addr,
606            client_guid,
607            ..
608        } => {
609            let session_id = adapter.register_peer(peer_id)?;
610            handler
611                .on_connect(session_id, addr.ip(), addr.port(), client_guid)
612                .await?;
613        }
614        RaknetServerEvent::PeerDisconnected {
615            peer_id, reason, ..
616        } => {
617            if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
618                handler.on_disconnect(session_id, reason).await?;
619                adapter.unregister_peer(peer_id);
620            } else {
621                debug!(
622                    peer_id = peer_id.as_u64(),
623                    ?reason,
624                    "ignoring disconnect for unknown session id mapping"
625                );
626            }
627        }
628        RaknetServerEvent::Packet {
629            peer_id, payload, ..
630        } => {
631            if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
632                handler.on_packet(session_id, payload).await?;
633            } else {
634                debug!(
635                    peer_id = peer_id.as_u64(),
636                    "dropping packet callback because session id mapping is missing"
637                );
638            }
639        }
640        RaknetServerEvent::ReceiptAcked {
641            peer_id,
642            receipt_id,
643            ..
644        } => {
645            if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
646                handler.on_ack(session_id, receipt_id).await?;
647            } else {
648                debug!(
649                    peer_id = peer_id.as_u64(),
650                    receipt_id, "dropping ack callback because session id mapping is missing"
651                );
652            }
653        }
654        RaknetServerEvent::Metrics {
655            shard_id,
656            snapshot,
657            dropped_non_critical_events,
658        } => {
659            handler
660                .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
661                .await?;
662        }
663        RaknetServerEvent::OfflinePacket { .. }
664        | RaknetServerEvent::PeerRateLimited { .. }
665        | RaknetServerEvent::SessionLimitReached { .. }
666        | RaknetServerEvent::ProxyDropped { .. }
667        | RaknetServerEvent::DecodeError { .. }
668        | RaknetServerEvent::WorkerError { .. }
669        | RaknetServerEvent::WorkerStopped { .. } => {}
670    }
671
672    Ok(())
673}
674
675pub struct SessionFacade<'a, H: SessionFacadeHandler> {
676    server: &'a mut RaknetServer,
677    handler: &'a mut H,
678    adapter: SessionIdAdapter,
679}
680
681impl<'a, H: SessionFacadeHandler> SessionFacade<'a, H> {
682    pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
683        Self {
684            server,
685            handler,
686            adapter: SessionIdAdapter::new(),
687        }
688    }
689
690    pub fn with_adapter(
691        server: &'a mut RaknetServer,
692        handler: &'a mut H,
693        adapter: SessionIdAdapter,
694    ) -> Self {
695        Self {
696            server,
697            handler,
698            adapter,
699        }
700    }
701
702    pub async fn next(&mut self) -> io::Result<bool> {
703        let Some(event) = self.server.next_event().await else {
704            return Ok(false);
705        };
706        self.dispatch(event).await?;
707        Ok(true)
708    }
709
710    pub async fn run(&mut self) -> io::Result<()> {
711        while self.next().await? {}
712        Ok(())
713    }
714
715    pub fn server(&self) -> &RaknetServer {
716        self.server
717    }
718
719    pub fn server_mut(&mut self) -> &mut RaknetServer {
720        self.server
721    }
722
723    pub fn handler(&self) -> &H {
724        self.handler
725    }
726
727    pub fn handler_mut(&mut self) -> &mut H {
728        self.handler
729    }
730
731    pub fn adapter(&self) -> &SessionIdAdapter {
732        &self.adapter
733    }
734
735    pub fn adapter_mut(&mut self) -> &mut SessionIdAdapter {
736        &mut self.adapter
737    }
738
739    pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
740        self.adapter.session_id_for_peer(peer_id)
741    }
742
743    pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
744        self.adapter.peer_id_for_session(session_id)
745    }
746
747    pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
748        self.adapter.peer_id_for_session_i32(session_id)
749    }
750
751    pub async fn send(
752        &mut self,
753        session_id: SessionId,
754        payload: impl Into<Bytes>,
755    ) -> io::Result<()> {
756        let peer_id = self.resolve_peer_id(session_id)?;
757        self.server.send(peer_id, payload).await
758    }
759
760    pub async fn send_with_options(
761        &mut self,
762        session_id: SessionId,
763        payload: impl Into<Bytes>,
764        options: SendOptions,
765    ) -> io::Result<()> {
766        let peer_id = self.resolve_peer_id(session_id)?;
767        self.server
768            .send_with_options(peer_id, payload, options)
769            .await
770    }
771
772    pub async fn send_with_receipt(
773        &mut self,
774        session_id: SessionId,
775        payload: impl Into<Bytes>,
776        receipt_id: u64,
777    ) -> io::Result<()> {
778        let peer_id = self.resolve_peer_id(session_id)?;
779        self.server
780            .send_with_receipt(peer_id, payload, receipt_id)
781            .await
782    }
783
784    pub async fn disconnect(&mut self, session_id: SessionId) -> io::Result<()> {
785        let peer_id = self.resolve_peer_id(session_id)?;
786        self.server.disconnect(peer_id).await
787    }
788
789    async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
790        dispatch_session_facade(&mut self.adapter, self.handler, event).await
791    }
792
793    fn resolve_peer_id(&self, session_id: SessionId) -> io::Result<PeerId> {
794        self.peer_id_for_session(session_id).ok_or_else(|| {
795            io::Error::new(
796                io::ErrorKind::NotFound,
797                format!("session id {session_id} is not mapped to any peer"),
798            )
799        })
800    }
801}
802
803#[derive(Debug, Clone, Default)]
804pub struct RaknetServerBuilder {
805    transport_config: TransportConfig,
806    runtime_config: ShardedRuntimeConfig,
807}
808
809impl RaknetServerBuilder {
810    pub fn transport_config(mut self, config: TransportConfig) -> Self {
811        self.transport_config = config;
812        self
813    }
814
815    pub fn runtime_config(mut self, config: ShardedRuntimeConfig) -> Self {
816        self.runtime_config = config;
817        self
818    }
819
820    pub fn bind_addr(mut self, bind_addr: SocketAddr) -> Self {
821        self.transport_config.bind_addr = bind_addr;
822        self
823    }
824
825    pub fn shard_count(mut self, shard_count: usize) -> Self {
826        self.runtime_config.shard_count = shard_count.max(1);
827        self
828    }
829
830    pub fn transport_config_mut(&mut self) -> &mut TransportConfig {
831        &mut self.transport_config
832    }
833
834    pub fn runtime_config_mut(&mut self) -> &mut ShardedRuntimeConfig {
835        &mut self.runtime_config
836    }
837
838    pub async fn start(self) -> io::Result<RaknetServer> {
839        self.transport_config
840            .validate()
841            .map_err(invalid_config_io_error)?;
842        self.runtime_config
843            .validate()
844            .map_err(invalid_config_io_error)?;
845        RaknetServer::start_with_configs(self.transport_config, self.runtime_config).await
846    }
847}
848
849#[derive(Debug, Clone, Copy)]
850struct PeerBinding {
851    peer_id: PeerId,
852    shard_id: usize,
853}
854
855pub struct RaknetServer {
856    runtime: ShardedRuntimeHandle,
857    peers_by_addr: FastMap<SocketAddr, PeerBinding>,
858    addrs_by_peer: FastMap<PeerId, SocketAddr>,
859    pending_events: VecDeque<RaknetServerEvent>,
860    next_peer_id: u64,
861}
862
863impl RaknetServer {
864    pub fn builder() -> RaknetServerBuilder {
865        RaknetServerBuilder::default()
866    }
867
868    pub async fn bind(bind_addr: SocketAddr) -> io::Result<Self> {
869        Self::builder().bind_addr(bind_addr).start().await
870    }
871
872    pub fn facade(&mut self) -> ServerFacade<'_> {
873        ServerFacade::new(self)
874    }
875
876    pub fn event_facade<'a, H: EventFacadeHandler>(
877        &'a mut self,
878        handler: &'a mut H,
879    ) -> EventFacade<'a, H> {
880        EventFacade::new(self, handler)
881    }
882
883    pub fn session_facade<'a, H: SessionFacadeHandler>(
884        &'a mut self,
885        handler: &'a mut H,
886    ) -> SessionFacade<'a, H> {
887        SessionFacade::new(self, handler)
888    }
889
890    pub async fn start_with_configs(
891        transport_config: TransportConfig,
892        runtime_config: ShardedRuntimeConfig,
893    ) -> io::Result<Self> {
894        transport_config
895            .validate()
896            .map_err(invalid_config_io_error)?;
897        runtime_config.validate().map_err(invalid_config_io_error)?;
898        let runtime = spawn_sharded_runtime(transport_config, runtime_config).await?;
899        Ok(Self {
900            runtime,
901            peers_by_addr: fast_map(),
902            addrs_by_peer: fast_map(),
903            pending_events: VecDeque::new(),
904            next_peer_id: 1,
905        })
906    }
907
908    pub fn peer_addr(&self, peer_id: PeerId) -> Option<SocketAddr> {
909        self.addrs_by_peer.get(&peer_id).map(|addr| *addr)
910    }
911
912    pub fn peer_shard(&self, peer_id: PeerId) -> Option<usize> {
913        let addr = self.addrs_by_peer.get(&peer_id).map(|addr| *addr)?;
914        self.peers_by_addr
915            .get(&addr)
916            .map(|binding| binding.shard_id)
917    }
918
919    pub fn peer_id_for_addr(&self, addr: SocketAddr) -> Option<PeerId> {
920        self.peers_by_addr.get(&addr).map(|binding| binding.peer_id)
921    }
922
923    pub async fn send(&self, peer_id: PeerId, payload: impl Into<Bytes>) -> io::Result<()> {
924        self.send_with_options(peer_id, payload, SendOptions::default())
925            .await
926    }
927
928    pub async fn send_with_options(
929        &self,
930        peer_id: PeerId,
931        payload: impl Into<Bytes>,
932        options: SendOptions,
933    ) -> io::Result<()> {
934        let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
935
936        self.runtime
937            .send_payload_to_shard(
938                shard_id,
939                ShardedSendPayload {
940                    addr,
941                    payload: payload.into(),
942                    reliability: options.reliability,
943                    channel: options.channel,
944                    priority: options.priority,
945                },
946            )
947            .await
948    }
949
950    pub async fn send_with_receipt(
951        &self,
952        peer_id: PeerId,
953        payload: impl Into<Bytes>,
954        receipt_id: u64,
955    ) -> io::Result<()> {
956        self.send_with_options_and_receipt(peer_id, payload, SendOptions::default(), receipt_id)
957            .await
958    }
959
960    pub async fn send_with_options_and_receipt(
961        &self,
962        peer_id: PeerId,
963        payload: impl Into<Bytes>,
964        options: SendOptions,
965        receipt_id: u64,
966    ) -> io::Result<()> {
967        let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
968
969        self.runtime
970            .send_payload_to_shard_with_receipt(
971                shard_id,
972                ShardedSendPayload {
973                    addr,
974                    payload: payload.into(),
975                    reliability: options.reliability,
976                    channel: options.channel,
977                    priority: options.priority,
978                },
979                receipt_id,
980            )
981            .await
982    }
983
984    pub async fn disconnect(&mut self, peer_id: PeerId) -> io::Result<()> {
985        let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
986        info!(
987            peer_id = peer_id.as_u64(),
988            %addr,
989            shard_id,
990            "server disconnect requested"
991        );
992
993        self.runtime
994            .disconnect_peer_from_shard(shard_id, addr)
995            .await?;
996        self.remove_peer(addr);
997        self.pending_events
998            .push_back(RaknetServerEvent::PeerDisconnected {
999                peer_id,
1000                addr,
1001                reason: PeerDisconnectReason::Requested,
1002            });
1003        Ok(())
1004    }
1005
1006    pub async fn next_event(&mut self) -> Option<RaknetServerEvent> {
1007        if let Some(event) = self.pending_events.pop_front() {
1008            return Some(event);
1009        }
1010
1011        loop {
1012            let runtime_event = self.runtime.event_rx.recv().await?;
1013            self.enqueue_runtime_event(runtime_event);
1014            if let Some(event) = self.pending_events.pop_front() {
1015                return Some(event);
1016            }
1017        }
1018    }
1019
1020    pub async fn shutdown(self) -> io::Result<()> {
1021        self.runtime.shutdown().await
1022    }
1023
1024    fn enqueue_runtime_event(&mut self, runtime_event: ShardedRuntimeEvent) {
1025        match runtime_event {
1026            ShardedRuntimeEvent::Transport { shard_id, event } => match event {
1027                TransportEvent::PeerDisconnected { addr, reason } => {
1028                    if let Some(peer_id) = self.remove_peer(addr) {
1029                        let reason = match reason {
1030                            RemoteDisconnectReason::DisconnectionNotification { reason_code } => {
1031                                PeerDisconnectReason::RemoteDisconnectionNotification {
1032                                    reason_code,
1033                                }
1034                            }
1035                            RemoteDisconnectReason::DetectLostConnection => {
1036                                PeerDisconnectReason::RemoteDetectLostConnection
1037                            }
1038                        };
1039                        info!(
1040                            peer_id = peer_id.as_u64(),
1041                            %addr,
1042                            ?reason,
1043                            "peer disconnected"
1044                        );
1045                        self.pending_events
1046                            .push_back(RaknetServerEvent::PeerDisconnected {
1047                                peer_id,
1048                                addr,
1049                                reason,
1050                            });
1051                    } else {
1052                        debug!(
1053                            %addr,
1054                            ?reason,
1055                            "received peer disconnect for unknown address"
1056                        );
1057                    }
1058                }
1059                TransportEvent::ConnectedFrames {
1060                    addr,
1061                    client_guid,
1062                    frames,
1063                    receipts,
1064                    ..
1065                } => {
1066                    let has_frames = !frames.is_empty();
1067                    let has_receipts = !receipts.acked_receipt_ids.is_empty();
1068
1069                    if client_guid.is_none() && !has_frames && !has_receipts {
1070                        debug!(
1071                            %addr,
1072                            shard_id,
1073                            "ignoring pre-connect transport event without frames/receipts"
1074                        );
1075                    } else {
1076                        let (peer_id, is_new) = self.ensure_peer(addr, shard_id);
1077                        if is_new {
1078                            let client_guid = client_guid.unwrap_or(peer_id.as_u64());
1079                            info!(
1080                                peer_id = peer_id.as_u64(),
1081                                %addr,
1082                                client_guid,
1083                                shard_id,
1084                                "peer connected"
1085                            );
1086                            self.pending_events
1087                                .push_back(RaknetServerEvent::PeerConnected {
1088                                    peer_id,
1089                                    addr,
1090                                    client_guid,
1091                                    shard_id,
1092                                });
1093                        }
1094
1095                        for frame in frames {
1096                            self.pending_events.push_back(RaknetServerEvent::Packet {
1097                                peer_id,
1098                                addr,
1099                                payload: frame.payload,
1100                                reliability: frame.reliability,
1101                                reliable_index: frame.reliable_index,
1102                                sequence_index: frame.sequence_index,
1103                                ordering_index: frame.ordering_index,
1104                                ordering_channel: frame.ordering_channel,
1105                            });
1106                        }
1107
1108                        for receipt_id in receipts.acked_receipt_ids {
1109                            self.pending_events
1110                                .push_back(RaknetServerEvent::ReceiptAcked {
1111                                    peer_id,
1112                                    addr,
1113                                    receipt_id,
1114                                });
1115                        }
1116                    }
1117                }
1118                TransportEvent::RateLimited { addr } => {
1119                    warn!(%addr, "peer rate-limited");
1120                    self.pending_events
1121                        .push_back(RaknetServerEvent::PeerRateLimited { addr });
1122                }
1123                TransportEvent::SessionLimitReached { addr } => {
1124                    warn!(%addr, "session limit reached");
1125                    self.pending_events
1126                        .push_back(RaknetServerEvent::SessionLimitReached { addr });
1127                }
1128                TransportEvent::ConnectedDatagramDroppedNoSession { .. } => {}
1129                TransportEvent::ProxyDropped { addr } => {
1130                    debug!(%addr, "proxy router dropped packet");
1131                    self.pending_events
1132                        .push_back(RaknetServerEvent::ProxyDropped { addr });
1133                }
1134                TransportEvent::DecodeError { addr, error } => {
1135                    warn!(%addr, %error, "transport decode error");
1136                    self.pending_events
1137                        .push_back(RaknetServerEvent::DecodeError {
1138                            addr,
1139                            error: error.to_string(),
1140                        });
1141                }
1142                TransportEvent::OfflinePacket { addr, packet } => {
1143                    self.pending_events
1144                        .push_back(RaknetServerEvent::OfflinePacket { addr, packet });
1145                }
1146            },
1147            ShardedRuntimeEvent::Metrics {
1148                shard_id,
1149                snapshot,
1150                dropped_non_critical_events,
1151            } => {
1152                if dropped_non_critical_events > 0 {
1153                    debug!(
1154                        shard_id,
1155                        dropped_non_critical_events,
1156                        "non-critical runtime events were dropped before metrics emit"
1157                    );
1158                }
1159                self.pending_events.push_back(RaknetServerEvent::Metrics {
1160                    shard_id,
1161                    snapshot,
1162                    dropped_non_critical_events,
1163                });
1164            }
1165            ShardedRuntimeEvent::WorkerError { shard_id, message } => {
1166                warn!(shard_id, %message, "runtime worker error");
1167                self.pending_events
1168                    .push_back(RaknetServerEvent::WorkerError { shard_id, message });
1169            }
1170            ShardedRuntimeEvent::WorkerStopped { shard_id } => {
1171                warn!(shard_id, "runtime worker stopped");
1172                let mut disconnected = Vec::new();
1173                for peer in self.peers_by_addr.iter() {
1174                    let addr = *peer.key();
1175                    let binding = *peer.value();
1176                    if binding.shard_id == shard_id {
1177                        disconnected.push((addr, binding.peer_id));
1178                    }
1179                }
1180                for (addr, peer_id) in disconnected {
1181                    self.remove_peer(addr);
1182                    info!(
1183                        peer_id = peer_id.as_u64(),
1184                        %addr,
1185                        shard_id,
1186                        "peer disconnected because worker stopped"
1187                    );
1188                    self.pending_events
1189                        .push_back(RaknetServerEvent::PeerDisconnected {
1190                            peer_id,
1191                            addr,
1192                            reason: PeerDisconnectReason::WorkerStopped { shard_id },
1193                        });
1194                }
1195                self.pending_events
1196                    .push_back(RaknetServerEvent::WorkerStopped { shard_id });
1197            }
1198        }
1199    }
1200
1201    fn ensure_peer(&mut self, addr: SocketAddr, shard_id: usize) -> (PeerId, bool) {
1202        if let Some(mut binding) = self.peers_by_addr.get_mut(&addr) {
1203            if binding.shard_id != shard_id {
1204                binding.shard_id = shard_id;
1205            }
1206            return (binding.peer_id, false);
1207        }
1208
1209        let peer_id = PeerId(self.next_peer_id);
1210        self.next_peer_id = self.next_peer_id.saturating_add(1);
1211        self.peers_by_addr
1212            .insert(addr, PeerBinding { peer_id, shard_id });
1213        self.addrs_by_peer.insert(peer_id, addr);
1214        (peer_id, true)
1215    }
1216
1217    fn remove_peer(&mut self, addr: SocketAddr) -> Option<PeerId> {
1218        let (_, binding) = self.peers_by_addr.remove(&addr)?;
1219        self.addrs_by_peer.remove(&binding.peer_id);
1220        Some(binding.peer_id)
1221    }
1222
1223    fn resolve_peer_route(&self, peer_id: PeerId) -> io::Result<(SocketAddr, usize)> {
1224        let addr = self
1225            .addrs_by_peer
1226            .get(&peer_id)
1227            .map(|entry| *entry)
1228            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer id not found"))?;
1229        let shard_id = self
1230            .peers_by_addr
1231            .get(&addr)
1232            .map(|binding| binding.shard_id)
1233            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer shard binding missing"))?;
1234        Ok((addr, shard_id))
1235    }
1236}
1237
1238fn invalid_config_io_error(error: ConfigValidationError) -> io::Error {
1239    io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244    use super::{
1245        EventFacadeHandler, PeerDisconnectReason, PeerId, RaknetServer, RaknetServerBuilder,
1246        RaknetServerEvent, ServerHookFuture, SessionFacadeHandler, SessionId, SessionIdAdapter,
1247        dispatch_event_facade, dispatch_session_facade,
1248    };
1249    use crate::protocol::reliability::Reliability;
1250    use crate::transport::{ShardedRuntimeConfig, TransportConfig, TransportMetricsSnapshot};
1251    use bytes::Bytes;
1252    use std::io;
1253    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1254
1255    #[test]
1256    fn builder_mutators_keep_values() {
1257        let builder = RaknetServerBuilder::default().shard_count(4);
1258        assert_eq!(builder.runtime_config.shard_count, 4);
1259    }
1260
1261    #[test]
1262    fn peer_id_roundtrip() {
1263        let peer = PeerId::from_u64(42);
1264        assert_eq!(peer.as_u64(), 42);
1265    }
1266
1267    #[test]
1268    fn builder_type_is_exposed() {
1269        let _ = RaknetServer::builder();
1270    }
1271
1272    #[tokio::test]
1273    async fn start_with_invalid_runtime_config_fails_fast() {
1274        let transport = TransportConfig {
1275            bind_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1276            ..TransportConfig::default()
1277        };
1278        let runtime = ShardedRuntimeConfig {
1279            shard_count: 0,
1280            ..ShardedRuntimeConfig::default()
1281        };
1282
1283        match RaknetServer::start_with_configs(transport, runtime).await {
1284            Ok(_) => panic!("invalid config must fail before runtime start"),
1285            Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
1286        }
1287    }
1288
1289    #[derive(Default)]
1290    struct CountingEventHandler {
1291        connect_calls: usize,
1292        disconnect_calls: usize,
1293        packet_calls: usize,
1294        ack_calls: usize,
1295        metrics_calls: usize,
1296        last_connect: Option<(u64, IpAddr, u16, u64)>,
1297        last_disconnect: Option<(u64, PeerDisconnectReason)>,
1298        last_packet: Option<(u64, Bytes)>,
1299        last_ack: Option<(u64, u64)>,
1300        last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1301    }
1302
1303    impl EventFacadeHandler for CountingEventHandler {
1304        fn on_connect<'a>(
1305            &'a mut self,
1306            session_id: u64,
1307            addr: IpAddr,
1308            port: u16,
1309            client_guid: u64,
1310        ) -> ServerHookFuture<'a> {
1311            self.connect_calls = self.connect_calls.saturating_add(1);
1312            self.last_connect = Some((session_id, addr, port, client_guid));
1313            Box::pin(async { Ok(()) })
1314        }
1315
1316        fn on_disconnect<'a>(
1317            &'a mut self,
1318            session_id: u64,
1319            reason: PeerDisconnectReason,
1320        ) -> ServerHookFuture<'a> {
1321            self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1322            self.last_disconnect = Some((session_id, reason));
1323            Box::pin(async { Ok(()) })
1324        }
1325
1326        fn on_packet<'a>(&'a mut self, session_id: u64, payload: Bytes) -> ServerHookFuture<'a> {
1327            self.packet_calls = self.packet_calls.saturating_add(1);
1328            self.last_packet = Some((session_id, payload));
1329            Box::pin(async { Ok(()) })
1330        }
1331
1332        fn on_ack<'a>(&'a mut self, session_id: u64, receipt_id: u64) -> ServerHookFuture<'a> {
1333            self.ack_calls = self.ack_calls.saturating_add(1);
1334            self.last_ack = Some((session_id, receipt_id));
1335            Box::pin(async { Ok(()) })
1336        }
1337
1338        fn on_metrics<'a>(
1339            &'a mut self,
1340            shard_id: usize,
1341            snapshot: TransportMetricsSnapshot,
1342            dropped_non_critical_events: u64,
1343        ) -> ServerHookFuture<'a> {
1344            self.metrics_calls = self.metrics_calls.saturating_add(1);
1345            self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1346            Box::pin(async { Ok(()) })
1347        }
1348    }
1349
1350    #[tokio::test]
1351    async fn dispatch_event_facade_maps_callbacks() -> io::Result<()> {
1352        let mut handler = CountingEventHandler::default();
1353        let peer_id = PeerId::from_u64(77);
1354        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)), 19132);
1355        let payload = Bytes::from_static(b"\x01\x02event");
1356        let metrics = TransportMetricsSnapshot {
1357            packets_forwarded_total: 33,
1358            bytes_forwarded_total: 1200,
1359            ..TransportMetricsSnapshot::default()
1360        };
1361
1362        dispatch_event_facade(
1363            &mut handler,
1364            RaknetServerEvent::PeerConnected {
1365                peer_id,
1366                addr,
1367                client_guid: 0xAABB_CCDD_EEFF_0011,
1368                shard_id: 2,
1369            },
1370        )
1371        .await?;
1372
1373        dispatch_event_facade(
1374            &mut handler,
1375            RaknetServerEvent::Packet {
1376                peer_id,
1377                addr,
1378                payload: payload.clone(),
1379                reliability: Reliability::ReliableOrdered,
1380                reliable_index: None,
1381                sequence_index: None,
1382                ordering_index: None,
1383                ordering_channel: None,
1384            },
1385        )
1386        .await?;
1387
1388        dispatch_event_facade(
1389            &mut handler,
1390            RaknetServerEvent::ReceiptAcked {
1391                peer_id,
1392                addr,
1393                receipt_id: 9001,
1394            },
1395        )
1396        .await?;
1397
1398        dispatch_event_facade(
1399            &mut handler,
1400            RaknetServerEvent::Metrics {
1401                shard_id: 2,
1402                snapshot: Box::new(metrics),
1403                dropped_non_critical_events: 5,
1404            },
1405        )
1406        .await?;
1407
1408        dispatch_event_facade(
1409            &mut handler,
1410            RaknetServerEvent::PeerDisconnected {
1411                peer_id,
1412                addr,
1413                reason: PeerDisconnectReason::Requested,
1414            },
1415        )
1416        .await?;
1417
1418        dispatch_event_facade(
1419            &mut handler,
1420            RaknetServerEvent::WorkerStopped { shard_id: 0 },
1421        )
1422        .await?;
1423
1424        assert_eq!(handler.connect_calls, 1);
1425        assert_eq!(handler.packet_calls, 1);
1426        assert_eq!(handler.ack_calls, 1);
1427        assert_eq!(handler.metrics_calls, 1);
1428        assert_eq!(handler.disconnect_calls, 1);
1429        assert_eq!(
1430            handler.last_connect,
1431            Some((77, addr.ip(), addr.port(), 0xAABB_CCDD_EEFF_0011))
1432        );
1433        assert_eq!(handler.last_packet, Some((77, payload)));
1434        assert_eq!(handler.last_ack, Some((77, 9001)));
1435        let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1436            .last_metrics
1437            .expect("metrics callback should store last snapshot");
1438        assert_eq!(metrics_shard, 2);
1439        assert_eq!(metrics_dropped, 5);
1440        assert_eq!(metrics_snapshot.packets_forwarded_total, 33);
1441        assert_eq!(metrics_snapshot.bytes_forwarded_total, 1200);
1442        assert_eq!(
1443            handler.last_disconnect,
1444            Some((77, PeerDisconnectReason::Requested))
1445        );
1446
1447        Ok(())
1448    }
1449
1450    #[derive(Default)]
1451    struct CountingSessionHandler {
1452        connect_calls: usize,
1453        disconnect_calls: usize,
1454        packet_calls: usize,
1455        ack_calls: usize,
1456        metrics_calls: usize,
1457        last_connect: Option<(SessionId, IpAddr, u16, u64)>,
1458        last_disconnect: Option<(SessionId, PeerDisconnectReason)>,
1459        last_packet: Option<(SessionId, Bytes)>,
1460        last_ack: Option<(SessionId, u64)>,
1461        last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1462    }
1463
1464    impl SessionFacadeHandler for CountingSessionHandler {
1465        fn on_connect<'a>(
1466            &'a mut self,
1467            session_id: SessionId,
1468            addr: IpAddr,
1469            port: u16,
1470            client_guid: u64,
1471        ) -> ServerHookFuture<'a> {
1472            self.connect_calls = self.connect_calls.saturating_add(1);
1473            self.last_connect = Some((session_id, addr, port, client_guid));
1474            Box::pin(async { Ok(()) })
1475        }
1476
1477        fn on_disconnect<'a>(
1478            &'a mut self,
1479            session_id: SessionId,
1480            reason: PeerDisconnectReason,
1481        ) -> ServerHookFuture<'a> {
1482            self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1483            self.last_disconnect = Some((session_id, reason));
1484            Box::pin(async { Ok(()) })
1485        }
1486
1487        fn on_packet<'a>(
1488            &'a mut self,
1489            session_id: SessionId,
1490            payload: Bytes,
1491        ) -> ServerHookFuture<'a> {
1492            self.packet_calls = self.packet_calls.saturating_add(1);
1493            self.last_packet = Some((session_id, payload));
1494            Box::pin(async { Ok(()) })
1495        }
1496
1497        fn on_ack<'a>(
1498            &'a mut self,
1499            session_id: SessionId,
1500            receipt_id: u64,
1501        ) -> ServerHookFuture<'a> {
1502            self.ack_calls = self.ack_calls.saturating_add(1);
1503            self.last_ack = Some((session_id, receipt_id));
1504            Box::pin(async { Ok(()) })
1505        }
1506
1507        fn on_metrics<'a>(
1508            &'a mut self,
1509            shard_id: usize,
1510            snapshot: TransportMetricsSnapshot,
1511            dropped_non_critical_events: u64,
1512        ) -> ServerHookFuture<'a> {
1513            self.metrics_calls = self.metrics_calls.saturating_add(1);
1514            self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1515            Box::pin(async { Ok(()) })
1516        }
1517    }
1518
1519    #[test]
1520    fn session_id_adapter_bridges_peer_and_signed_ids() {
1521        let mut adapter = SessionIdAdapter::new();
1522        let peer_a = PeerId::from_u64(0x1_0000_0001);
1523        let peer_b = PeerId::from_u64(0x2_0000_0002);
1524
1525        let session_a = adapter
1526            .register_peer(peer_a)
1527            .expect("first session id allocation should succeed");
1528        let session_b = adapter
1529            .register_peer(peer_b)
1530            .expect("second session id allocation should succeed");
1531
1532        assert_eq!(session_a, 1);
1533        assert_eq!(session_b, 2);
1534        assert_eq!(adapter.session_id_for_peer(peer_a), Some(session_a));
1535        assert_eq!(adapter.peer_id_for_session(session_b), Some(peer_b));
1536        assert_eq!(adapter.peer_id_for_session_i32(2), Some(peer_b));
1537        assert_eq!(adapter.peer_id_for_session_i32(-1), None);
1538        assert_eq!(SessionIdAdapter::session_id_to_i32(session_a), Some(1));
1539        assert_eq!(SessionIdAdapter::session_id_from_i32(2), Some(2));
1540        assert_eq!(SessionIdAdapter::session_id_from_i32(-5), None);
1541
1542        assert_eq!(adapter.unregister_peer(peer_a), Some(session_a));
1543        assert_eq!(adapter.session_id_for_peer(peer_a), None);
1544        assert_eq!(adapter.peer_id_for_session(session_a), None);
1545    }
1546
1547    #[tokio::test]
1548    async fn dispatch_session_facade_maps_callbacks_and_releases_mapping() -> io::Result<()> {
1549        let mut adapter = SessionIdAdapter::new();
1550        let mut handler = CountingSessionHandler::default();
1551        let peer_id = PeerId::from_u64(0xDEAD_BEEF_F00D);
1552        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 1, 2, 3)), 19133);
1553        let payload = Bytes::from_static(b"\x10\x20session");
1554        let metrics = TransportMetricsSnapshot {
1555            packets_forwarded_total: 5,
1556            bytes_forwarded_total: 80,
1557            ..TransportMetricsSnapshot::default()
1558        };
1559
1560        dispatch_session_facade(
1561            &mut adapter,
1562            &mut handler,
1563            RaknetServerEvent::PeerConnected {
1564                peer_id,
1565                addr,
1566                client_guid: 0xABCD_EF01_0203_0405,
1567                shard_id: 0,
1568            },
1569        )
1570        .await?;
1571
1572        let session_id = adapter
1573            .session_id_for_peer(peer_id)
1574            .expect("session id should be registered after connect");
1575        assert_eq!(session_id, 1);
1576
1577        dispatch_session_facade(
1578            &mut adapter,
1579            &mut handler,
1580            RaknetServerEvent::Packet {
1581                peer_id,
1582                addr,
1583                payload: payload.clone(),
1584                reliability: Reliability::ReliableOrdered,
1585                reliable_index: None,
1586                sequence_index: None,
1587                ordering_index: None,
1588                ordering_channel: None,
1589            },
1590        )
1591        .await?;
1592
1593        dispatch_session_facade(
1594            &mut adapter,
1595            &mut handler,
1596            RaknetServerEvent::ReceiptAcked {
1597                peer_id,
1598                addr,
1599                receipt_id: 44,
1600            },
1601        )
1602        .await?;
1603
1604        dispatch_session_facade(
1605            &mut adapter,
1606            &mut handler,
1607            RaknetServerEvent::Metrics {
1608                shard_id: 0,
1609                snapshot: Box::new(metrics),
1610                dropped_non_critical_events: 7,
1611            },
1612        )
1613        .await?;
1614
1615        dispatch_session_facade(
1616            &mut adapter,
1617            &mut handler,
1618            RaknetServerEvent::PeerDisconnected {
1619                peer_id,
1620                addr,
1621                reason: PeerDisconnectReason::Requested,
1622            },
1623        )
1624        .await?;
1625
1626        assert_eq!(handler.connect_calls, 1);
1627        assert_eq!(handler.packet_calls, 1);
1628        assert_eq!(handler.ack_calls, 1);
1629        assert_eq!(handler.metrics_calls, 1);
1630        assert_eq!(handler.disconnect_calls, 1);
1631        assert_eq!(
1632            handler.last_connect,
1633            Some((1, addr.ip(), addr.port(), 0xABCD_EF01_0203_0405))
1634        );
1635        assert_eq!(handler.last_packet, Some((1, payload)));
1636        assert_eq!(handler.last_ack, Some((1, 44)));
1637        assert_eq!(
1638            handler.last_disconnect,
1639            Some((1, PeerDisconnectReason::Requested))
1640        );
1641        assert_eq!(adapter.session_id_for_peer(peer_id), None);
1642        assert_eq!(adapter.peer_id_for_session(1), None);
1643
1644        let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1645            .last_metrics
1646            .expect("metrics callback should store last snapshot");
1647        assert_eq!(metrics_shard, 0);
1648        assert_eq!(metrics_dropped, 7);
1649        assert_eq!(metrics_snapshot.packets_forwarded_total, 5);
1650        assert_eq!(metrics_snapshot.bytes_forwarded_total, 80);
1651
1652        Ok(())
1653    }
1654}