Skip to main content

raknet_rust/
server.rs

1//! High-level server API.
2//!
3//! [`RaknetServer`] provides an event-driven surface over the sharded transport
4//! runtime. For ergonomic integration, use:
5//! - [`ServerFacade`] for closure-based hooks
6//! - [`EventFacade`] for handler-trait hooks
7//! - [`SessionFacade`] for session-id based hooks
8
9use std::collections::VecDeque;
10use std::future::Future;
11use std::io;
12use std::net::{IpAddr, SocketAddr};
13use std::pin::Pin;
14
15use bytes::Bytes;
16use tracing::{debug, info, warn};
17
18use crate::concurrency::{FastMap, fast_map};
19use crate::error::ConfigValidationError;
20use crate::handshake::OfflinePacket;
21use crate::protocol::reliability::Reliability;
22use crate::protocol::sequence24::Sequence24;
23use crate::session::RakPriority;
24use crate::transport::{
25    RemoteDisconnectReason, ShardedRuntimeConfig, ShardedRuntimeEvent, ShardedRuntimeHandle,
26    ShardedSendPayload, TransportConfig, TransportEvent, TransportMetricsSnapshot,
27    spawn_sharded_runtime,
28};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31/// Stable server-side identifier assigned to a connected peer.
32pub struct PeerId(u64);
33
34impl PeerId {
35    /// Creates a [`PeerId`] from a raw `u64`.
36    pub const fn from_u64(value: u64) -> Self {
37        Self(value)
38    }
39
40    /// Returns the underlying raw identifier.
41    pub const fn as_u64(self) -> u64 {
42        self.0
43    }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47/// Send policy used when dispatching payloads to a peer.
48pub struct SendOptions {
49    /// RakNet reliability class for the outgoing frame.
50    pub reliability: Reliability,
51    /// Ordering channel (used by ordered/sequenced reliabilities).
52    pub channel: u8,
53    /// Priority used by the session scheduler.
54    pub priority: RakPriority,
55}
56
57impl Default for SendOptions {
58    fn default() -> Self {
59        Self {
60            reliability: Reliability::ReliableOrdered,
61            channel: 0,
62            priority: RakPriority::High,
63        }
64    }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68/// Reason reported when a peer session is closed.
69pub enum PeerDisconnectReason {
70    Requested,
71    RemoteDisconnectionNotification { reason_code: Option<u8> },
72    RemoteDetectLostConnection,
73    WorkerStopped { shard_id: usize },
74}
75
76#[derive(Debug)]
77/// Event stream produced by [`RaknetServer::next_event`].
78pub enum RaknetServerEvent {
79    PeerConnected {
80        peer_id: PeerId,
81        addr: SocketAddr,
82        client_guid: u64,
83        shard_id: usize,
84    },
85    PeerDisconnected {
86        peer_id: PeerId,
87        addr: SocketAddr,
88        reason: PeerDisconnectReason,
89    },
90    Packet {
91        peer_id: PeerId,
92        addr: SocketAddr,
93        payload: Bytes,
94        reliability: Reliability,
95        reliable_index: Option<Sequence24>,
96        sequence_index: Option<Sequence24>,
97        ordering_index: Option<Sequence24>,
98        ordering_channel: Option<u8>,
99    },
100    OfflinePacket {
101        addr: SocketAddr,
102        packet: OfflinePacket,
103    },
104    ReceiptAcked {
105        peer_id: PeerId,
106        addr: SocketAddr,
107        receipt_id: u64,
108    },
109    PeerRateLimited {
110        addr: SocketAddr,
111    },
112    SessionLimitReached {
113        addr: SocketAddr,
114    },
115    ProxyDropped {
116        addr: SocketAddr,
117    },
118    DecodeError {
119        addr: SocketAddr,
120        error: String,
121    },
122    WorkerError {
123        shard_id: usize,
124        message: String,
125    },
126    WorkerStopped {
127        shard_id: usize,
128    },
129    Metrics {
130        shard_id: usize,
131        snapshot: Box<TransportMetricsSnapshot>,
132        dropped_non_critical_events: u64,
133    },
134}
135
136impl RaknetServerEvent {
137    /// Returns metrics payload for [`RaknetServerEvent::Metrics`], otherwise `None`.
138    pub fn metrics_snapshot(&self) -> Option<(usize, &TransportMetricsSnapshot, u64)> {
139        match self {
140            Self::Metrics {
141                shard_id,
142                snapshot,
143                dropped_non_critical_events,
144            } => Some((*shard_id, snapshot.as_ref(), *dropped_non_critical_events)),
145            _ => None,
146        }
147    }
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151/// Hook payload for a newly connected peer.
152pub struct ConnectEvent {
153    pub peer_id: PeerId,
154    pub addr: SocketAddr,
155    pub client_guid: u64,
156    pub shard_id: usize,
157}
158
159#[derive(Debug, Clone)]
160/// Hook payload for an inbound application packet.
161pub struct PacketEvent {
162    pub peer_id: PeerId,
163    pub addr: SocketAddr,
164    pub payload: Bytes,
165    pub reliability: Reliability,
166    pub reliable_index: Option<Sequence24>,
167    pub sequence_index: Option<Sequence24>,
168    pub ordering_index: Option<Sequence24>,
169    pub ordering_channel: Option<u8>,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173/// Hook payload for a disconnected peer.
174pub struct DisconnectEvent {
175    pub peer_id: PeerId,
176    pub addr: SocketAddr,
177    pub reason: PeerDisconnectReason,
178}
179
180/// Async return type used by server hook callbacks.
181pub type ServerHookFuture<'a> = Pin<Box<dyn Future<Output = io::Result<()>> + Send + 'a>>;
182
183type ConnectHandler =
184    Box<dyn for<'a> FnMut(&'a mut RaknetServer, ConnectEvent) -> ServerHookFuture<'a> + Send>;
185type PacketHandler =
186    Box<dyn for<'a> FnMut(&'a mut RaknetServer, PacketEvent) -> ServerHookFuture<'a> + Send>;
187type DisconnectHandler =
188    Box<dyn for<'a> FnMut(&'a mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'a> + Send>;
189
190/// Closure-based facade over [`RaknetServerEvent`] stream.
191///
192/// Useful when you want to wire small handlers without defining custom handler traits.
193pub struct ServerFacade<'a> {
194    server: &'a mut RaknetServer,
195    on_connect: Option<ConnectHandler>,
196    on_packet: Option<PacketHandler>,
197    on_disconnect: Option<DisconnectHandler>,
198}
199
200impl<'a> ServerFacade<'a> {
201    /// Creates a new facade around a mutable server reference.
202    pub fn new(server: &'a mut RaknetServer) -> Self {
203        Self {
204            server,
205            on_connect: None,
206            on_packet: None,
207            on_disconnect: None,
208        }
209    }
210
211    /// Registers `on_connect` callback.
212    pub fn on_connect<F>(mut self, handler: F) -> Self
213    where
214        F: for<'b> FnMut(&'b mut RaknetServer, ConnectEvent) -> ServerHookFuture<'b>
215            + Send
216            + 'static,
217    {
218        self.on_connect = Some(Box::new(handler));
219        self
220    }
221
222    /// Registers `on_packet` callback.
223    pub fn on_packet<F>(mut self, handler: F) -> Self
224    where
225        F: for<'b> FnMut(&'b mut RaknetServer, PacketEvent) -> ServerHookFuture<'b>
226            + Send
227            + 'static,
228    {
229        self.on_packet = Some(Box::new(handler));
230        self
231    }
232
233    /// Registers `on_disconnect` callback.
234    pub fn on_disconnect<F>(mut self, handler: F) -> Self
235    where
236        F: for<'b> FnMut(&'b mut RaknetServer, DisconnectEvent) -> ServerHookFuture<'b>
237            + Send
238            + 'static,
239    {
240        self.on_disconnect = Some(Box::new(handler));
241        self
242    }
243
244    /// Polls one server event and dispatches to registered callbacks.
245    ///
246    /// Returns `Ok(false)` when the server event stream is closed.
247    pub async fn next(&mut self) -> io::Result<bool> {
248        let Some(event) = self.server.next_event().await else {
249            return Ok(false);
250        };
251        self.dispatch(event).await?;
252        Ok(true)
253    }
254
255    /// Runs the dispatch loop until stream closure or callback error.
256    pub async fn run(&mut self) -> io::Result<()> {
257        while self.next().await? {}
258        Ok(())
259    }
260
261    /// Returns immutable access to wrapped server.
262    pub fn server(&self) -> &RaknetServer {
263        self.server
264    }
265
266    /// Returns mutable access to wrapped server.
267    pub fn server_mut(&mut self) -> &mut RaknetServer {
268        self.server
269    }
270
271    async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
272        match event {
273            RaknetServerEvent::PeerConnected {
274                peer_id,
275                addr,
276                client_guid,
277                shard_id,
278            } => {
279                if let Some(handler) = self.on_connect.as_mut() {
280                    handler(
281                        self.server,
282                        ConnectEvent {
283                            peer_id,
284                            addr,
285                            client_guid,
286                            shard_id,
287                        },
288                    )
289                    .await?;
290                }
291            }
292            RaknetServerEvent::Packet {
293                peer_id,
294                addr,
295                payload,
296                reliability,
297                reliable_index,
298                sequence_index,
299                ordering_index,
300                ordering_channel,
301            } => {
302                if let Some(handler) = self.on_packet.as_mut() {
303                    handler(
304                        self.server,
305                        PacketEvent {
306                            peer_id,
307                            addr,
308                            payload,
309                            reliability,
310                            reliable_index,
311                            sequence_index,
312                            ordering_index,
313                            ordering_channel,
314                        },
315                    )
316                    .await?;
317                }
318            }
319            RaknetServerEvent::PeerDisconnected {
320                peer_id,
321                addr,
322                reason,
323            } => {
324                if let Some(handler) = self.on_disconnect.as_mut() {
325                    handler(
326                        self.server,
327                        DisconnectEvent {
328                            peer_id,
329                            addr,
330                            reason,
331                        },
332                    )
333                    .await?;
334                }
335            }
336            RaknetServerEvent::OfflinePacket { .. }
337            | RaknetServerEvent::ReceiptAcked { .. }
338            | RaknetServerEvent::PeerRateLimited { .. }
339            | RaknetServerEvent::SessionLimitReached { .. }
340            | RaknetServerEvent::ProxyDropped { .. }
341            | RaknetServerEvent::DecodeError { .. }
342            | RaknetServerEvent::WorkerError { .. }
343            | RaknetServerEvent::WorkerStopped { .. }
344            | RaknetServerEvent::Metrics { .. } => {}
345        }
346
347        Ok(())
348    }
349}
350
351/// Event-driven callback surface that mirrors RakLibInterface-style hooks.
352///
353/// This trait is intentionally event-driven and is fed from `RaknetServerEvent`.
354/// All methods are optional and default to no-op.
355pub trait EventFacadeHandler {
356    fn on_connect<'a>(
357        &'a mut self,
358        _session_id: u64,
359        _addr: IpAddr,
360        _port: u16,
361        _client_guid: u64,
362    ) -> ServerHookFuture<'a> {
363        Box::pin(async { Ok(()) })
364    }
365
366    fn on_disconnect<'a>(
367        &'a mut self,
368        _session_id: u64,
369        _reason: PeerDisconnectReason,
370    ) -> ServerHookFuture<'a> {
371        Box::pin(async { Ok(()) })
372    }
373
374    fn on_packet<'a>(&'a mut self, _session_id: u64, _payload: Bytes) -> ServerHookFuture<'a> {
375        Box::pin(async { Ok(()) })
376    }
377
378    fn on_ack<'a>(&'a mut self, _session_id: u64, _receipt_id: u64) -> ServerHookFuture<'a> {
379        Box::pin(async { Ok(()) })
380    }
381
382    fn on_metrics<'a>(
383        &'a mut self,
384        _shard_id: usize,
385        _snapshot: TransportMetricsSnapshot,
386        _dropped_non_critical_events: u64,
387    ) -> ServerHookFuture<'a> {
388        Box::pin(async { Ok(()) })
389    }
390}
391
392/// Dispatches a single [`RaknetServerEvent`] into an [`EventFacadeHandler`].
393pub async fn dispatch_event_facade<H: EventFacadeHandler>(
394    handler: &mut H,
395    event: RaknetServerEvent,
396) -> io::Result<()> {
397    match event {
398        RaknetServerEvent::PeerConnected {
399            peer_id,
400            addr,
401            client_guid,
402            ..
403        } => {
404            handler
405                .on_connect(peer_id.as_u64(), addr.ip(), addr.port(), client_guid)
406                .await?;
407        }
408        RaknetServerEvent::PeerDisconnected {
409            peer_id, reason, ..
410        } => {
411            handler.on_disconnect(peer_id.as_u64(), reason).await?;
412        }
413        RaknetServerEvent::Packet {
414            peer_id, payload, ..
415        } => {
416            handler.on_packet(peer_id.as_u64(), payload).await?;
417        }
418        RaknetServerEvent::ReceiptAcked {
419            peer_id,
420            receipt_id,
421            ..
422        } => {
423            handler.on_ack(peer_id.as_u64(), receipt_id).await?;
424        }
425        RaknetServerEvent::Metrics {
426            shard_id,
427            snapshot,
428            dropped_non_critical_events,
429        } => {
430            handler
431                .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
432                .await?;
433        }
434        RaknetServerEvent::OfflinePacket { .. }
435        | RaknetServerEvent::PeerRateLimited { .. }
436        | RaknetServerEvent::SessionLimitReached { .. }
437        | RaknetServerEvent::ProxyDropped { .. }
438        | RaknetServerEvent::DecodeError { .. }
439        | RaknetServerEvent::WorkerError { .. }
440        | RaknetServerEvent::WorkerStopped { .. } => {}
441    }
442
443    Ok(())
444}
445
446/// Trait-based facade runner over server events.
447pub struct EventFacade<'a, H: EventFacadeHandler> {
448    server: &'a mut RaknetServer,
449    handler: &'a mut H,
450}
451
452impl<'a, H: EventFacadeHandler> EventFacade<'a, H> {
453    /// Creates a new event facade binding a server and handler.
454    pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
455        Self { server, handler }
456    }
457
458    /// Polls one event and dispatches it into handler callbacks.
459    ///
460    /// Returns `Ok(false)` when the server event stream is closed.
461    pub async fn next(&mut self) -> io::Result<bool> {
462        let Some(event) = self.server.next_event().await else {
463            return Ok(false);
464        };
465        self.dispatch(event).await?;
466        Ok(true)
467    }
468
469    /// Runs the facade loop until closure or callback error.
470    pub async fn run(&mut self) -> io::Result<()> {
471        while self.next().await? {}
472        Ok(())
473    }
474
475    /// Returns immutable access to wrapped server.
476    pub fn server(&self) -> &RaknetServer {
477        self.server
478    }
479
480    /// Returns mutable access to wrapped server.
481    pub fn server_mut(&mut self) -> &mut RaknetServer {
482        self.server
483    }
484
485    /// Returns immutable access to wrapped handler.
486    pub fn handler(&self) -> &H {
487        self.handler
488    }
489
490    /// Returns mutable access to wrapped handler.
491    pub fn handler_mut(&mut self) -> &mut H {
492        self.handler
493    }
494
495    async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
496        dispatch_event_facade(self.handler, event).await
497    }
498}
499
500pub type SessionId = u32;
501
502#[derive(Debug)]
503/// Bidirectional mapping between [`PeerId`] and stable session ids.
504pub struct SessionIdAdapter {
505    peer_to_session: FastMap<PeerId, SessionId>,
506    session_to_peer: FastMap<SessionId, PeerId>,
507    next_session_id: SessionId,
508}
509
510impl Default for SessionIdAdapter {
511    fn default() -> Self {
512        Self::new()
513    }
514}
515
516impl SessionIdAdapter {
517    /// Creates a new empty adapter.
518    pub fn new() -> Self {
519        Self {
520            peer_to_session: fast_map(),
521            session_to_peer: fast_map(),
522            next_session_id: 1,
523        }
524    }
525
526    /// Returns number of currently mapped sessions.
527    pub fn len(&self) -> usize {
528        self.peer_to_session.len()
529    }
530
531    /// Returns `true` if no session is currently mapped.
532    pub fn is_empty(&self) -> bool {
533        self.peer_to_session.is_empty()
534    }
535
536    /// Resolves a [`PeerId`] to session id.
537    pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
538        self.peer_to_session.get(&peer_id).map(|entry| *entry)
539    }
540
541    /// Resolves a session id to [`PeerId`].
542    pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
543        self.session_to_peer.get(&session_id).map(|entry| *entry)
544    }
545
546    /// Resolves an `i32` session id to [`PeerId`].
547    pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
548        let session_id = Self::session_id_from_i32(session_id)?;
549        self.peer_id_for_session(session_id)
550    }
551
552    /// Registers peer and returns mapped session id.
553    ///
554    /// If peer is already registered, existing id is returned.
555    pub fn register_peer(&mut self, peer_id: PeerId) -> io::Result<SessionId> {
556        if let Some(existing) = self.session_id_for_peer(peer_id) {
557            return Ok(existing);
558        }
559
560        let session_id = self.allocate_session_id()?;
561        self.peer_to_session.insert(peer_id, session_id);
562        self.session_to_peer.insert(session_id, peer_id);
563        Ok(session_id)
564    }
565
566    /// Unregisters peer and returns removed session id, if present.
567    pub fn unregister_peer(&mut self, peer_id: PeerId) -> Option<SessionId> {
568        let (_, session_id) = self.peer_to_session.remove(&peer_id)?;
569        self.session_to_peer.remove(&session_id);
570        Some(session_id)
571    }
572
573    /// Clears all mappings and resets id allocator.
574    pub fn clear(&mut self) {
575        self.peer_to_session.clear();
576        self.session_to_peer.clear();
577        self.next_session_id = 1;
578    }
579
580    /// Converts internal session id to `i32` when representable.
581    pub fn session_id_to_i32(session_id: SessionId) -> Option<i32> {
582        i32::try_from(session_id).ok()
583    }
584
585    /// Converts `i32` session id into internal type.
586    pub fn session_id_from_i32(session_id: i32) -> Option<SessionId> {
587        u32::try_from(session_id).ok()
588    }
589
590    fn allocate_session_id(&mut self) -> io::Result<SessionId> {
591        let mut candidate = if self.next_session_id == 0 {
592            1
593        } else {
594            self.next_session_id
595        };
596
597        for _ in 0..u32::MAX {
598            if !self.session_to_peer.contains_key(&candidate) {
599                self.next_session_id = candidate.wrapping_add(1);
600                if self.next_session_id == 0 {
601                    self.next_session_id = 1;
602                }
603                return Ok(candidate);
604            }
605
606            candidate = candidate.wrapping_add(1);
607            if candidate == 0 {
608                candidate = 1;
609            }
610        }
611
612        Err(io::Error::other("session id space exhausted"))
613    }
614}
615
616/// Session-id based callback surface.
617///
618/// Use this when your application prefers integer session ids over [`PeerId`].
619pub trait SessionFacadeHandler {
620    fn on_connect<'a>(
621        &'a mut self,
622        _session_id: SessionId,
623        _addr: IpAddr,
624        _port: u16,
625        _client_guid: u64,
626    ) -> ServerHookFuture<'a> {
627        Box::pin(async { Ok(()) })
628    }
629
630    fn on_disconnect<'a>(
631        &'a mut self,
632        _session_id: SessionId,
633        _reason: PeerDisconnectReason,
634    ) -> ServerHookFuture<'a> {
635        Box::pin(async { Ok(()) })
636    }
637
638    fn on_packet<'a>(
639        &'a mut self,
640        _session_id: SessionId,
641        _payload: Bytes,
642    ) -> ServerHookFuture<'a> {
643        Box::pin(async { Ok(()) })
644    }
645
646    fn on_ack<'a>(&'a mut self, _session_id: SessionId, _receipt_id: u64) -> ServerHookFuture<'a> {
647        Box::pin(async { Ok(()) })
648    }
649
650    fn on_metrics<'a>(
651        &'a mut self,
652        _shard_id: usize,
653        _snapshot: TransportMetricsSnapshot,
654        _dropped_non_critical_events: u64,
655    ) -> ServerHookFuture<'a> {
656        Box::pin(async { Ok(()) })
657    }
658}
659
660/// Dispatches a single event into a [`SessionFacadeHandler`].
661pub async fn dispatch_session_facade<H: SessionFacadeHandler>(
662    adapter: &mut SessionIdAdapter,
663    handler: &mut H,
664    event: RaknetServerEvent,
665) -> io::Result<()> {
666    match event {
667        RaknetServerEvent::PeerConnected {
668            peer_id,
669            addr,
670            client_guid,
671            ..
672        } => {
673            let session_id = adapter.register_peer(peer_id)?;
674            handler
675                .on_connect(session_id, addr.ip(), addr.port(), client_guid)
676                .await?;
677        }
678        RaknetServerEvent::PeerDisconnected {
679            peer_id, reason, ..
680        } => {
681            if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
682                handler.on_disconnect(session_id, reason).await?;
683                adapter.unregister_peer(peer_id);
684            } else {
685                debug!(
686                    peer_id = peer_id.as_u64(),
687                    ?reason,
688                    "ignoring disconnect for unknown session id mapping"
689                );
690            }
691        }
692        RaknetServerEvent::Packet {
693            peer_id, payload, ..
694        } => {
695            if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
696                handler.on_packet(session_id, payload).await?;
697            } else {
698                debug!(
699                    peer_id = peer_id.as_u64(),
700                    "dropping packet callback because session id mapping is missing"
701                );
702            }
703        }
704        RaknetServerEvent::ReceiptAcked {
705            peer_id,
706            receipt_id,
707            ..
708        } => {
709            if let Some(session_id) = adapter.session_id_for_peer(peer_id) {
710                handler.on_ack(session_id, receipt_id).await?;
711            } else {
712                debug!(
713                    peer_id = peer_id.as_u64(),
714                    receipt_id, "dropping ack callback because session id mapping is missing"
715                );
716            }
717        }
718        RaknetServerEvent::Metrics {
719            shard_id,
720            snapshot,
721            dropped_non_critical_events,
722        } => {
723            handler
724                .on_metrics(shard_id, *snapshot, dropped_non_critical_events)
725                .await?;
726        }
727        RaknetServerEvent::OfflinePacket { .. }
728        | RaknetServerEvent::PeerRateLimited { .. }
729        | RaknetServerEvent::SessionLimitReached { .. }
730        | RaknetServerEvent::ProxyDropped { .. }
731        | RaknetServerEvent::DecodeError { .. }
732        | RaknetServerEvent::WorkerError { .. }
733        | RaknetServerEvent::WorkerStopped { .. } => {}
734    }
735
736    Ok(())
737}
738
739/// Session-id based facade over server events.
740pub struct SessionFacade<'a, H: SessionFacadeHandler> {
741    server: &'a mut RaknetServer,
742    handler: &'a mut H,
743    adapter: SessionIdAdapter,
744}
745
746impl<'a, H: SessionFacadeHandler> SessionFacade<'a, H> {
747    /// Creates a new session facade with an empty [`SessionIdAdapter`].
748    pub fn new(server: &'a mut RaknetServer, handler: &'a mut H) -> Self {
749        Self {
750            server,
751            handler,
752            adapter: SessionIdAdapter::new(),
753        }
754    }
755
756    /// Creates a session facade with a caller-provided adapter.
757    pub fn with_adapter(
758        server: &'a mut RaknetServer,
759        handler: &'a mut H,
760        adapter: SessionIdAdapter,
761    ) -> Self {
762        Self {
763            server,
764            handler,
765            adapter,
766        }
767    }
768
769    /// Polls one event and dispatches it to the session handler.
770    ///
771    /// Returns `Ok(false)` when the server event stream is closed.
772    pub async fn next(&mut self) -> io::Result<bool> {
773        let Some(event) = self.server.next_event().await else {
774            return Ok(false);
775        };
776        self.dispatch(event).await?;
777        Ok(true)
778    }
779
780    /// Runs facade loop until stream closure or callback error.
781    pub async fn run(&mut self) -> io::Result<()> {
782        while self.next().await? {}
783        Ok(())
784    }
785
786    /// Returns immutable access to wrapped server.
787    pub fn server(&self) -> &RaknetServer {
788        self.server
789    }
790
791    /// Returns mutable access to wrapped server.
792    pub fn server_mut(&mut self) -> &mut RaknetServer {
793        self.server
794    }
795
796    /// Returns immutable access to wrapped handler.
797    pub fn handler(&self) -> &H {
798        self.handler
799    }
800
801    /// Returns mutable access to wrapped handler.
802    pub fn handler_mut(&mut self) -> &mut H {
803        self.handler
804    }
805
806    /// Returns immutable access to the session-id adapter.
807    pub fn adapter(&self) -> &SessionIdAdapter {
808        &self.adapter
809    }
810
811    /// Returns mutable access to the session-id adapter.
812    pub fn adapter_mut(&mut self) -> &mut SessionIdAdapter {
813        &mut self.adapter
814    }
815
816    /// Resolves [`PeerId`] to session id.
817    pub fn session_id_for_peer(&self, peer_id: PeerId) -> Option<SessionId> {
818        self.adapter.session_id_for_peer(peer_id)
819    }
820
821    /// Resolves session id to [`PeerId`].
822    pub fn peer_id_for_session(&self, session_id: SessionId) -> Option<PeerId> {
823        self.adapter.peer_id_for_session(session_id)
824    }
825
826    /// Resolves i32 session id to [`PeerId`].
827    pub fn peer_id_for_session_i32(&self, session_id: i32) -> Option<PeerId> {
828        self.adapter.peer_id_for_session_i32(session_id)
829    }
830
831    /// Sends payload to a session with default send options.
832    pub async fn send(
833        &mut self,
834        session_id: SessionId,
835        payload: impl Into<Bytes>,
836    ) -> io::Result<()> {
837        let peer_id = self.resolve_peer_id(session_id)?;
838        self.server.send(peer_id, payload).await
839    }
840
841    /// Sends payload to a session with explicit send options.
842    pub async fn send_with_options(
843        &mut self,
844        session_id: SessionId,
845        payload: impl Into<Bytes>,
846        options: SendOptions,
847    ) -> io::Result<()> {
848        let peer_id = self.resolve_peer_id(session_id)?;
849        self.server
850            .send_with_options(peer_id, payload, options)
851            .await
852    }
853
854    /// Sends payload to a session and tracks a receipt id.
855    pub async fn send_with_receipt(
856        &mut self,
857        session_id: SessionId,
858        payload: impl Into<Bytes>,
859        receipt_id: u64,
860    ) -> io::Result<()> {
861        let peer_id = self.resolve_peer_id(session_id)?;
862        self.server
863            .send_with_receipt(peer_id, payload, receipt_id)
864            .await
865    }
866
867    /// Disconnects a session.
868    pub async fn disconnect(&mut self, session_id: SessionId) -> io::Result<()> {
869        let peer_id = self.resolve_peer_id(session_id)?;
870        self.server.disconnect(peer_id).await
871    }
872
873    async fn dispatch(&mut self, event: RaknetServerEvent) -> io::Result<()> {
874        dispatch_session_facade(&mut self.adapter, self.handler, event).await
875    }
876
877    fn resolve_peer_id(&self, session_id: SessionId) -> io::Result<PeerId> {
878        self.peer_id_for_session(session_id).ok_or_else(|| {
879            io::Error::new(
880                io::ErrorKind::NotFound,
881                format!("session id {session_id} is not mapped to any peer"),
882            )
883        })
884    }
885}
886
887#[derive(Debug, Clone, Default)]
888/// Builder for [`RaknetServer`].
889pub struct RaknetServerBuilder {
890    transport_config: TransportConfig,
891    runtime_config: ShardedRuntimeConfig,
892}
893
894impl RaknetServerBuilder {
895    /// Replaces transport configuration.
896    pub fn transport_config(mut self, config: TransportConfig) -> Self {
897        self.transport_config = config;
898        self
899    }
900
901    /// Replaces runtime/shard configuration.
902    pub fn runtime_config(mut self, config: ShardedRuntimeConfig) -> Self {
903        self.runtime_config = config;
904        self
905    }
906
907    /// Sets bind address in transport configuration.
908    pub fn bind_addr(mut self, bind_addr: SocketAddr) -> Self {
909        self.transport_config.bind_addr = bind_addr;
910        self
911    }
912
913    /// Sets shard count (minimum `1`).
914    pub fn shard_count(mut self, shard_count: usize) -> Self {
915        self.runtime_config.shard_count = shard_count.max(1);
916        self
917    }
918
919    /// Returns mutable transport config for in-place adjustments.
920    pub fn transport_config_mut(&mut self) -> &mut TransportConfig {
921        &mut self.transport_config
922    }
923
924    /// Returns mutable runtime config for in-place adjustments.
925    pub fn runtime_config_mut(&mut self) -> &mut ShardedRuntimeConfig {
926        &mut self.runtime_config
927    }
928
929    /// Validates configs and starts the server.
930    pub async fn start(self) -> io::Result<RaknetServer> {
931        self.transport_config
932            .validate()
933            .map_err(invalid_config_io_error)?;
934        self.runtime_config
935            .validate()
936            .map_err(invalid_config_io_error)?;
937        RaknetServer::start_with_configs(self.transport_config, self.runtime_config).await
938    }
939}
940
941#[derive(Debug, Clone, Copy)]
942struct PeerBinding {
943    peer_id: PeerId,
944    shard_id: usize,
945}
946
947pub struct RaknetServer {
948    runtime: ShardedRuntimeHandle,
949    peers_by_addr: FastMap<SocketAddr, PeerBinding>,
950    addrs_by_peer: FastMap<PeerId, SocketAddr>,
951    pending_events: VecDeque<RaknetServerEvent>,
952    next_peer_id: u64,
953}
954
955impl RaknetServer {
956    /// Creates a default builder.
957    pub fn builder() -> RaknetServerBuilder {
958        RaknetServerBuilder::default()
959    }
960
961    /// Starts server with default configs and provided bind address.
962    pub async fn bind(bind_addr: SocketAddr) -> io::Result<Self> {
963        Self::builder().bind_addr(bind_addr).start().await
964    }
965
966    /// Creates closure-based facade over server event stream.
967    pub fn facade(&mut self) -> ServerFacade<'_> {
968        ServerFacade::new(self)
969    }
970
971    /// Creates trait-based event facade over server event stream.
972    pub fn event_facade<'a, H: EventFacadeHandler>(
973        &'a mut self,
974        handler: &'a mut H,
975    ) -> EventFacade<'a, H> {
976        EventFacade::new(self, handler)
977    }
978
979    /// Creates session-id based facade over server event stream.
980    pub fn session_facade<'a, H: SessionFacadeHandler>(
981        &'a mut self,
982        handler: &'a mut H,
983    ) -> SessionFacade<'a, H> {
984        SessionFacade::new(self, handler)
985    }
986
987    /// Starts server from explicit transport and runtime configurations.
988    pub async fn start_with_configs(
989        transport_config: TransportConfig,
990        runtime_config: ShardedRuntimeConfig,
991    ) -> io::Result<Self> {
992        transport_config
993            .validate()
994            .map_err(invalid_config_io_error)?;
995        runtime_config.validate().map_err(invalid_config_io_error)?;
996        let runtime = spawn_sharded_runtime(transport_config, runtime_config).await?;
997        Ok(Self {
998            runtime,
999            peers_by_addr: fast_map(),
1000            addrs_by_peer: fast_map(),
1001            pending_events: VecDeque::new(),
1002            next_peer_id: 1,
1003        })
1004    }
1005
1006    /// Returns peer socket address for a peer id, if present.
1007    pub fn peer_addr(&self, peer_id: PeerId) -> Option<SocketAddr> {
1008        self.addrs_by_peer.get(&peer_id).map(|addr| *addr)
1009    }
1010
1011    /// Returns shard id owning a peer, if present.
1012    pub fn peer_shard(&self, peer_id: PeerId) -> Option<usize> {
1013        let addr = self.addrs_by_peer.get(&peer_id).map(|addr| *addr)?;
1014        self.peers_by_addr
1015            .get(&addr)
1016            .map(|binding| binding.shard_id)
1017    }
1018
1019    /// Resolves peer id by remote address, if known.
1020    pub fn peer_id_for_addr(&self, addr: SocketAddr) -> Option<PeerId> {
1021        self.peers_by_addr.get(&addr).map(|binding| binding.peer_id)
1022    }
1023
1024    /// Sends payload with default send options.
1025    pub async fn send(&self, peer_id: PeerId, payload: impl Into<Bytes>) -> io::Result<()> {
1026        self.send_with_options(peer_id, payload, SendOptions::default())
1027            .await
1028    }
1029
1030    /// Sends payload with explicit send options.
1031    pub async fn send_with_options(
1032        &self,
1033        peer_id: PeerId,
1034        payload: impl Into<Bytes>,
1035        options: SendOptions,
1036    ) -> io::Result<()> {
1037        let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
1038
1039        self.runtime
1040            .send_payload_to_shard(
1041                shard_id,
1042                ShardedSendPayload {
1043                    addr,
1044                    payload: payload.into(),
1045                    reliability: options.reliability,
1046                    channel: options.channel,
1047                    priority: options.priority,
1048                },
1049            )
1050            .await
1051    }
1052
1053    /// Sends payload and tracks a receipt id.
1054    pub async fn send_with_receipt(
1055        &self,
1056        peer_id: PeerId,
1057        payload: impl Into<Bytes>,
1058        receipt_id: u64,
1059    ) -> io::Result<()> {
1060        self.send_with_options_and_receipt(peer_id, payload, SendOptions::default(), receipt_id)
1061            .await
1062    }
1063
1064    /// Sends payload with explicit send options and receipt id.
1065    pub async fn send_with_options_and_receipt(
1066        &self,
1067        peer_id: PeerId,
1068        payload: impl Into<Bytes>,
1069        options: SendOptions,
1070        receipt_id: u64,
1071    ) -> io::Result<()> {
1072        let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
1073
1074        self.runtime
1075            .send_payload_to_shard_with_receipt(
1076                shard_id,
1077                ShardedSendPayload {
1078                    addr,
1079                    payload: payload.into(),
1080                    reliability: options.reliability,
1081                    channel: options.channel,
1082                    priority: options.priority,
1083                },
1084                receipt_id,
1085            )
1086            .await
1087    }
1088
1089    /// Requests disconnection and emits a local disconnect event.
1090    pub async fn disconnect(&mut self, peer_id: PeerId) -> io::Result<()> {
1091        let (addr, shard_id) = self.resolve_peer_route(peer_id)?;
1092        info!(
1093            peer_id = peer_id.as_u64(),
1094            %addr,
1095            shard_id,
1096            "server disconnect requested"
1097        );
1098
1099        self.runtime
1100            .disconnect_peer_from_shard(shard_id, addr)
1101            .await?;
1102        self.remove_peer(addr);
1103        self.pending_events
1104            .push_back(RaknetServerEvent::PeerDisconnected {
1105                peer_id,
1106                addr,
1107                reason: PeerDisconnectReason::Requested,
1108            });
1109        Ok(())
1110    }
1111
1112    /// Polls next server event.
1113    ///
1114    /// Returns `None` when runtime event channel is closed.
1115    pub async fn next_event(&mut self) -> Option<RaknetServerEvent> {
1116        if let Some(event) = self.pending_events.pop_front() {
1117            return Some(event);
1118        }
1119
1120        loop {
1121            let runtime_event = self.runtime.event_rx.recv().await?;
1122            self.enqueue_runtime_event(runtime_event);
1123            if let Some(event) = self.pending_events.pop_front() {
1124                return Some(event);
1125            }
1126        }
1127    }
1128
1129    /// Gracefully stops all workers and closes sockets.
1130    pub async fn shutdown(self) -> io::Result<()> {
1131        self.runtime.shutdown().await
1132    }
1133
1134    fn enqueue_runtime_event(&mut self, runtime_event: ShardedRuntimeEvent) {
1135        match runtime_event {
1136            ShardedRuntimeEvent::Transport { shard_id, event } => match event {
1137                TransportEvent::PeerDisconnected { addr, reason } => {
1138                    if let Some(peer_id) = self.remove_peer(addr) {
1139                        let reason = match reason {
1140                            RemoteDisconnectReason::DisconnectionNotification { reason_code } => {
1141                                PeerDisconnectReason::RemoteDisconnectionNotification {
1142                                    reason_code,
1143                                }
1144                            }
1145                            RemoteDisconnectReason::DetectLostConnection => {
1146                                PeerDisconnectReason::RemoteDetectLostConnection
1147                            }
1148                        };
1149                        info!(
1150                            peer_id = peer_id.as_u64(),
1151                            %addr,
1152                            ?reason,
1153                            "peer disconnected"
1154                        );
1155                        self.pending_events
1156                            .push_back(RaknetServerEvent::PeerDisconnected {
1157                                peer_id,
1158                                addr,
1159                                reason,
1160                            });
1161                    } else {
1162                        debug!(
1163                            %addr,
1164                            ?reason,
1165                            "received peer disconnect for unknown address"
1166                        );
1167                    }
1168                }
1169                TransportEvent::ConnectedFrames {
1170                    addr,
1171                    client_guid,
1172                    frames,
1173                    receipts,
1174                    ..
1175                } => {
1176                    let has_frames = !frames.is_empty();
1177                    let has_receipts = !receipts.acked_receipt_ids.is_empty();
1178
1179                    if client_guid.is_none() && !has_frames && !has_receipts {
1180                        debug!(
1181                            %addr,
1182                            shard_id,
1183                            "ignoring pre-connect transport event without frames/receipts"
1184                        );
1185                    } else {
1186                        let (peer_id, is_new) = self.ensure_peer(addr, shard_id);
1187                        if is_new {
1188                            let client_guid = client_guid.unwrap_or(peer_id.as_u64());
1189                            info!(
1190                                peer_id = peer_id.as_u64(),
1191                                %addr,
1192                                client_guid,
1193                                shard_id,
1194                                "peer connected"
1195                            );
1196                            self.pending_events
1197                                .push_back(RaknetServerEvent::PeerConnected {
1198                                    peer_id,
1199                                    addr,
1200                                    client_guid,
1201                                    shard_id,
1202                                });
1203                        }
1204
1205                        for frame in frames {
1206                            self.pending_events.push_back(RaknetServerEvent::Packet {
1207                                peer_id,
1208                                addr,
1209                                payload: frame.payload,
1210                                reliability: frame.reliability,
1211                                reliable_index: frame.reliable_index,
1212                                sequence_index: frame.sequence_index,
1213                                ordering_index: frame.ordering_index,
1214                                ordering_channel: frame.ordering_channel,
1215                            });
1216                        }
1217
1218                        for receipt_id in receipts.acked_receipt_ids {
1219                            self.pending_events
1220                                .push_back(RaknetServerEvent::ReceiptAcked {
1221                                    peer_id,
1222                                    addr,
1223                                    receipt_id,
1224                                });
1225                        }
1226                    }
1227                }
1228                TransportEvent::RateLimited { addr } => {
1229                    warn!(%addr, "peer rate-limited");
1230                    self.pending_events
1231                        .push_back(RaknetServerEvent::PeerRateLimited { addr });
1232                }
1233                TransportEvent::SessionLimitReached { addr } => {
1234                    warn!(%addr, "session limit reached");
1235                    self.pending_events
1236                        .push_back(RaknetServerEvent::SessionLimitReached { addr });
1237                }
1238                TransportEvent::ConnectedDatagramDroppedNoSession { .. } => {}
1239                TransportEvent::ProxyDropped { addr } => {
1240                    debug!(%addr, "proxy router dropped packet");
1241                    self.pending_events
1242                        .push_back(RaknetServerEvent::ProxyDropped { addr });
1243                }
1244                TransportEvent::DecodeError { addr, error } => {
1245                    warn!(%addr, %error, "transport decode error");
1246                    self.pending_events
1247                        .push_back(RaknetServerEvent::DecodeError {
1248                            addr,
1249                            error: error.to_string(),
1250                        });
1251                }
1252                TransportEvent::OfflinePacket { addr, packet } => {
1253                    self.pending_events
1254                        .push_back(RaknetServerEvent::OfflinePacket { addr, packet });
1255                }
1256            },
1257            ShardedRuntimeEvent::Metrics {
1258                shard_id,
1259                snapshot,
1260                dropped_non_critical_events,
1261            } => {
1262                if dropped_non_critical_events > 0 {
1263                    debug!(
1264                        shard_id,
1265                        dropped_non_critical_events,
1266                        "non-critical runtime events were dropped before metrics emit"
1267                    );
1268                }
1269                self.pending_events.push_back(RaknetServerEvent::Metrics {
1270                    shard_id,
1271                    snapshot,
1272                    dropped_non_critical_events,
1273                });
1274            }
1275            ShardedRuntimeEvent::WorkerError { shard_id, message } => {
1276                warn!(shard_id, %message, "runtime worker error");
1277                self.pending_events
1278                    .push_back(RaknetServerEvent::WorkerError { shard_id, message });
1279            }
1280            ShardedRuntimeEvent::WorkerStopped { shard_id } => {
1281                warn!(shard_id, "runtime worker stopped");
1282                let mut disconnected = Vec::new();
1283                for peer in self.peers_by_addr.iter() {
1284                    let addr = *peer.key();
1285                    let binding = *peer.value();
1286                    if binding.shard_id == shard_id {
1287                        disconnected.push((addr, binding.peer_id));
1288                    }
1289                }
1290                for (addr, peer_id) in disconnected {
1291                    self.remove_peer(addr);
1292                    info!(
1293                        peer_id = peer_id.as_u64(),
1294                        %addr,
1295                        shard_id,
1296                        "peer disconnected because worker stopped"
1297                    );
1298                    self.pending_events
1299                        .push_back(RaknetServerEvent::PeerDisconnected {
1300                            peer_id,
1301                            addr,
1302                            reason: PeerDisconnectReason::WorkerStopped { shard_id },
1303                        });
1304                }
1305                self.pending_events
1306                    .push_back(RaknetServerEvent::WorkerStopped { shard_id });
1307            }
1308        }
1309    }
1310
1311    fn ensure_peer(&mut self, addr: SocketAddr, shard_id: usize) -> (PeerId, bool) {
1312        if let Some(mut binding) = self.peers_by_addr.get_mut(&addr) {
1313            if binding.shard_id != shard_id {
1314                binding.shard_id = shard_id;
1315            }
1316            return (binding.peer_id, false);
1317        }
1318
1319        let peer_id = PeerId(self.next_peer_id);
1320        self.next_peer_id = self.next_peer_id.saturating_add(1);
1321        self.peers_by_addr
1322            .insert(addr, PeerBinding { peer_id, shard_id });
1323        self.addrs_by_peer.insert(peer_id, addr);
1324        (peer_id, true)
1325    }
1326
1327    fn remove_peer(&mut self, addr: SocketAddr) -> Option<PeerId> {
1328        let (_, binding) = self.peers_by_addr.remove(&addr)?;
1329        self.addrs_by_peer.remove(&binding.peer_id);
1330        Some(binding.peer_id)
1331    }
1332
1333    fn resolve_peer_route(&self, peer_id: PeerId) -> io::Result<(SocketAddr, usize)> {
1334        let addr = self
1335            .addrs_by_peer
1336            .get(&peer_id)
1337            .map(|entry| *entry)
1338            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer id not found"))?;
1339        let shard_id = self
1340            .peers_by_addr
1341            .get(&addr)
1342            .map(|binding| binding.shard_id)
1343            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "peer shard binding missing"))?;
1344        Ok((addr, shard_id))
1345    }
1346}
1347
1348fn invalid_config_io_error(error: ConfigValidationError) -> io::Error {
1349    io::Error::new(io::ErrorKind::InvalidInput, error.to_string())
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354    use super::{
1355        EventFacadeHandler, PeerDisconnectReason, PeerId, RaknetServer, RaknetServerBuilder,
1356        RaknetServerEvent, ServerHookFuture, SessionFacadeHandler, SessionId, SessionIdAdapter,
1357        dispatch_event_facade, dispatch_session_facade,
1358    };
1359    use crate::protocol::reliability::Reliability;
1360    use crate::transport::{ShardedRuntimeConfig, TransportConfig, TransportMetricsSnapshot};
1361    use bytes::Bytes;
1362    use std::io;
1363    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1364
1365    #[test]
1366    fn builder_mutators_keep_values() {
1367        let builder = RaknetServerBuilder::default().shard_count(4);
1368        assert_eq!(builder.runtime_config.shard_count, 4);
1369    }
1370
1371    #[test]
1372    fn peer_id_roundtrip() {
1373        let peer = PeerId::from_u64(42);
1374        assert_eq!(peer.as_u64(), 42);
1375    }
1376
1377    #[test]
1378    fn builder_type_is_exposed() {
1379        let _ = RaknetServer::builder();
1380    }
1381
1382    #[tokio::test]
1383    async fn start_with_invalid_runtime_config_fails_fast() {
1384        let transport = TransportConfig {
1385            bind_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1386            ..TransportConfig::default()
1387        };
1388        let runtime = ShardedRuntimeConfig {
1389            shard_count: 0,
1390            ..ShardedRuntimeConfig::default()
1391        };
1392
1393        match RaknetServer::start_with_configs(transport, runtime).await {
1394            Ok(_) => panic!("invalid config must fail before runtime start"),
1395            Err(err) => assert_eq!(err.kind(), io::ErrorKind::InvalidInput),
1396        }
1397    }
1398
1399    #[derive(Default)]
1400    struct CountingEventHandler {
1401        connect_calls: usize,
1402        disconnect_calls: usize,
1403        packet_calls: usize,
1404        ack_calls: usize,
1405        metrics_calls: usize,
1406        last_connect: Option<(u64, IpAddr, u16, u64)>,
1407        last_disconnect: Option<(u64, PeerDisconnectReason)>,
1408        last_packet: Option<(u64, Bytes)>,
1409        last_ack: Option<(u64, u64)>,
1410        last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1411    }
1412
1413    impl EventFacadeHandler for CountingEventHandler {
1414        fn on_connect<'a>(
1415            &'a mut self,
1416            session_id: u64,
1417            addr: IpAddr,
1418            port: u16,
1419            client_guid: u64,
1420        ) -> ServerHookFuture<'a> {
1421            self.connect_calls = self.connect_calls.saturating_add(1);
1422            self.last_connect = Some((session_id, addr, port, client_guid));
1423            Box::pin(async { Ok(()) })
1424        }
1425
1426        fn on_disconnect<'a>(
1427            &'a mut self,
1428            session_id: u64,
1429            reason: PeerDisconnectReason,
1430        ) -> ServerHookFuture<'a> {
1431            self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1432            self.last_disconnect = Some((session_id, reason));
1433            Box::pin(async { Ok(()) })
1434        }
1435
1436        fn on_packet<'a>(&'a mut self, session_id: u64, payload: Bytes) -> ServerHookFuture<'a> {
1437            self.packet_calls = self.packet_calls.saturating_add(1);
1438            self.last_packet = Some((session_id, payload));
1439            Box::pin(async { Ok(()) })
1440        }
1441
1442        fn on_ack<'a>(&'a mut self, session_id: u64, receipt_id: u64) -> ServerHookFuture<'a> {
1443            self.ack_calls = self.ack_calls.saturating_add(1);
1444            self.last_ack = Some((session_id, receipt_id));
1445            Box::pin(async { Ok(()) })
1446        }
1447
1448        fn on_metrics<'a>(
1449            &'a mut self,
1450            shard_id: usize,
1451            snapshot: TransportMetricsSnapshot,
1452            dropped_non_critical_events: u64,
1453        ) -> ServerHookFuture<'a> {
1454            self.metrics_calls = self.metrics_calls.saturating_add(1);
1455            self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1456            Box::pin(async { Ok(()) })
1457        }
1458    }
1459
1460    #[tokio::test]
1461    async fn dispatch_event_facade_maps_callbacks() -> io::Result<()> {
1462        let mut handler = CountingEventHandler::default();
1463        let peer_id = PeerId::from_u64(77);
1464        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)), 19132);
1465        let payload = Bytes::from_static(b"\x01\x02event");
1466        let metrics = TransportMetricsSnapshot {
1467            packets_forwarded_total: 33,
1468            bytes_forwarded_total: 1200,
1469            ..TransportMetricsSnapshot::default()
1470        };
1471
1472        dispatch_event_facade(
1473            &mut handler,
1474            RaknetServerEvent::PeerConnected {
1475                peer_id,
1476                addr,
1477                client_guid: 0xAABB_CCDD_EEFF_0011,
1478                shard_id: 2,
1479            },
1480        )
1481        .await?;
1482
1483        dispatch_event_facade(
1484            &mut handler,
1485            RaknetServerEvent::Packet {
1486                peer_id,
1487                addr,
1488                payload: payload.clone(),
1489                reliability: Reliability::ReliableOrdered,
1490                reliable_index: None,
1491                sequence_index: None,
1492                ordering_index: None,
1493                ordering_channel: None,
1494            },
1495        )
1496        .await?;
1497
1498        dispatch_event_facade(
1499            &mut handler,
1500            RaknetServerEvent::ReceiptAcked {
1501                peer_id,
1502                addr,
1503                receipt_id: 9001,
1504            },
1505        )
1506        .await?;
1507
1508        dispatch_event_facade(
1509            &mut handler,
1510            RaknetServerEvent::Metrics {
1511                shard_id: 2,
1512                snapshot: Box::new(metrics),
1513                dropped_non_critical_events: 5,
1514            },
1515        )
1516        .await?;
1517
1518        dispatch_event_facade(
1519            &mut handler,
1520            RaknetServerEvent::PeerDisconnected {
1521                peer_id,
1522                addr,
1523                reason: PeerDisconnectReason::Requested,
1524            },
1525        )
1526        .await?;
1527
1528        dispatch_event_facade(
1529            &mut handler,
1530            RaknetServerEvent::WorkerStopped { shard_id: 0 },
1531        )
1532        .await?;
1533
1534        assert_eq!(handler.connect_calls, 1);
1535        assert_eq!(handler.packet_calls, 1);
1536        assert_eq!(handler.ack_calls, 1);
1537        assert_eq!(handler.metrics_calls, 1);
1538        assert_eq!(handler.disconnect_calls, 1);
1539        assert_eq!(
1540            handler.last_connect,
1541            Some((77, addr.ip(), addr.port(), 0xAABB_CCDD_EEFF_0011))
1542        );
1543        assert_eq!(handler.last_packet, Some((77, payload)));
1544        assert_eq!(handler.last_ack, Some((77, 9001)));
1545        let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1546            .last_metrics
1547            .expect("metrics callback should store last snapshot");
1548        assert_eq!(metrics_shard, 2);
1549        assert_eq!(metrics_dropped, 5);
1550        assert_eq!(metrics_snapshot.packets_forwarded_total, 33);
1551        assert_eq!(metrics_snapshot.bytes_forwarded_total, 1200);
1552        assert_eq!(
1553            handler.last_disconnect,
1554            Some((77, PeerDisconnectReason::Requested))
1555        );
1556
1557        Ok(())
1558    }
1559
1560    #[derive(Default)]
1561    struct CountingSessionHandler {
1562        connect_calls: usize,
1563        disconnect_calls: usize,
1564        packet_calls: usize,
1565        ack_calls: usize,
1566        metrics_calls: usize,
1567        last_connect: Option<(SessionId, IpAddr, u16, u64)>,
1568        last_disconnect: Option<(SessionId, PeerDisconnectReason)>,
1569        last_packet: Option<(SessionId, Bytes)>,
1570        last_ack: Option<(SessionId, u64)>,
1571        last_metrics: Option<(usize, TransportMetricsSnapshot, u64)>,
1572    }
1573
1574    impl SessionFacadeHandler for CountingSessionHandler {
1575        fn on_connect<'a>(
1576            &'a mut self,
1577            session_id: SessionId,
1578            addr: IpAddr,
1579            port: u16,
1580            client_guid: u64,
1581        ) -> ServerHookFuture<'a> {
1582            self.connect_calls = self.connect_calls.saturating_add(1);
1583            self.last_connect = Some((session_id, addr, port, client_guid));
1584            Box::pin(async { Ok(()) })
1585        }
1586
1587        fn on_disconnect<'a>(
1588            &'a mut self,
1589            session_id: SessionId,
1590            reason: PeerDisconnectReason,
1591        ) -> ServerHookFuture<'a> {
1592            self.disconnect_calls = self.disconnect_calls.saturating_add(1);
1593            self.last_disconnect = Some((session_id, reason));
1594            Box::pin(async { Ok(()) })
1595        }
1596
1597        fn on_packet<'a>(
1598            &'a mut self,
1599            session_id: SessionId,
1600            payload: Bytes,
1601        ) -> ServerHookFuture<'a> {
1602            self.packet_calls = self.packet_calls.saturating_add(1);
1603            self.last_packet = Some((session_id, payload));
1604            Box::pin(async { Ok(()) })
1605        }
1606
1607        fn on_ack<'a>(
1608            &'a mut self,
1609            session_id: SessionId,
1610            receipt_id: u64,
1611        ) -> ServerHookFuture<'a> {
1612            self.ack_calls = self.ack_calls.saturating_add(1);
1613            self.last_ack = Some((session_id, receipt_id));
1614            Box::pin(async { Ok(()) })
1615        }
1616
1617        fn on_metrics<'a>(
1618            &'a mut self,
1619            shard_id: usize,
1620            snapshot: TransportMetricsSnapshot,
1621            dropped_non_critical_events: u64,
1622        ) -> ServerHookFuture<'a> {
1623            self.metrics_calls = self.metrics_calls.saturating_add(1);
1624            self.last_metrics = Some((shard_id, snapshot, dropped_non_critical_events));
1625            Box::pin(async { Ok(()) })
1626        }
1627    }
1628
1629    #[test]
1630    fn session_id_adapter_bridges_peer_and_signed_ids() {
1631        let mut adapter = SessionIdAdapter::new();
1632        let peer_a = PeerId::from_u64(0x1_0000_0001);
1633        let peer_b = PeerId::from_u64(0x2_0000_0002);
1634
1635        let session_a = adapter
1636            .register_peer(peer_a)
1637            .expect("first session id allocation should succeed");
1638        let session_b = adapter
1639            .register_peer(peer_b)
1640            .expect("second session id allocation should succeed");
1641
1642        assert_eq!(session_a, 1);
1643        assert_eq!(session_b, 2);
1644        assert_eq!(adapter.session_id_for_peer(peer_a), Some(session_a));
1645        assert_eq!(adapter.peer_id_for_session(session_b), Some(peer_b));
1646        assert_eq!(adapter.peer_id_for_session_i32(2), Some(peer_b));
1647        assert_eq!(adapter.peer_id_for_session_i32(-1), None);
1648        assert_eq!(SessionIdAdapter::session_id_to_i32(session_a), Some(1));
1649        assert_eq!(SessionIdAdapter::session_id_from_i32(2), Some(2));
1650        assert_eq!(SessionIdAdapter::session_id_from_i32(-5), None);
1651
1652        assert_eq!(adapter.unregister_peer(peer_a), Some(session_a));
1653        assert_eq!(adapter.session_id_for_peer(peer_a), None);
1654        assert_eq!(adapter.peer_id_for_session(session_a), None);
1655    }
1656
1657    #[tokio::test]
1658    async fn dispatch_session_facade_maps_callbacks_and_releases_mapping() -> io::Result<()> {
1659        let mut adapter = SessionIdAdapter::new();
1660        let mut handler = CountingSessionHandler::default();
1661        let peer_id = PeerId::from_u64(0xDEAD_BEEF_F00D);
1662        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 1, 2, 3)), 19133);
1663        let payload = Bytes::from_static(b"\x10\x20session");
1664        let metrics = TransportMetricsSnapshot {
1665            packets_forwarded_total: 5,
1666            bytes_forwarded_total: 80,
1667            ..TransportMetricsSnapshot::default()
1668        };
1669
1670        dispatch_session_facade(
1671            &mut adapter,
1672            &mut handler,
1673            RaknetServerEvent::PeerConnected {
1674                peer_id,
1675                addr,
1676                client_guid: 0xABCD_EF01_0203_0405,
1677                shard_id: 0,
1678            },
1679        )
1680        .await?;
1681
1682        let session_id = adapter
1683            .session_id_for_peer(peer_id)
1684            .expect("session id should be registered after connect");
1685        assert_eq!(session_id, 1);
1686
1687        dispatch_session_facade(
1688            &mut adapter,
1689            &mut handler,
1690            RaknetServerEvent::Packet {
1691                peer_id,
1692                addr,
1693                payload: payload.clone(),
1694                reliability: Reliability::ReliableOrdered,
1695                reliable_index: None,
1696                sequence_index: None,
1697                ordering_index: None,
1698                ordering_channel: None,
1699            },
1700        )
1701        .await?;
1702
1703        dispatch_session_facade(
1704            &mut adapter,
1705            &mut handler,
1706            RaknetServerEvent::ReceiptAcked {
1707                peer_id,
1708                addr,
1709                receipt_id: 44,
1710            },
1711        )
1712        .await?;
1713
1714        dispatch_session_facade(
1715            &mut adapter,
1716            &mut handler,
1717            RaknetServerEvent::Metrics {
1718                shard_id: 0,
1719                snapshot: Box::new(metrics),
1720                dropped_non_critical_events: 7,
1721            },
1722        )
1723        .await?;
1724
1725        dispatch_session_facade(
1726            &mut adapter,
1727            &mut handler,
1728            RaknetServerEvent::PeerDisconnected {
1729                peer_id,
1730                addr,
1731                reason: PeerDisconnectReason::Requested,
1732            },
1733        )
1734        .await?;
1735
1736        assert_eq!(handler.connect_calls, 1);
1737        assert_eq!(handler.packet_calls, 1);
1738        assert_eq!(handler.ack_calls, 1);
1739        assert_eq!(handler.metrics_calls, 1);
1740        assert_eq!(handler.disconnect_calls, 1);
1741        assert_eq!(
1742            handler.last_connect,
1743            Some((1, addr.ip(), addr.port(), 0xABCD_EF01_0203_0405))
1744        );
1745        assert_eq!(handler.last_packet, Some((1, payload)));
1746        assert_eq!(handler.last_ack, Some((1, 44)));
1747        assert_eq!(
1748            handler.last_disconnect,
1749            Some((1, PeerDisconnectReason::Requested))
1750        );
1751        assert_eq!(adapter.session_id_for_peer(peer_id), None);
1752        assert_eq!(adapter.peer_id_for_session(1), None);
1753
1754        let (metrics_shard, metrics_snapshot, metrics_dropped) = handler
1755            .last_metrics
1756            .expect("metrics callback should store last snapshot");
1757        assert_eq!(metrics_shard, 0);
1758        assert_eq!(metrics_dropped, 7);
1759        assert_eq!(metrics_snapshot.packets_forwarded_total, 5);
1760        assert_eq!(metrics_snapshot.bytes_forwarded_total, 80);
1761
1762        Ok(())
1763    }
1764}