naia_server/
server.rs

1use std::{
2    any::Any,
3    collections::{hash_set::Iter, HashMap, HashSet},
4    hash::Hash,
5    net::SocketAddr,
6    panic,
7    time::Duration,
8};
9
10use log::{info, warn};
11
12use naia_shared::{
13    BigMap, BitReader, BitWriter, Channel, ChannelKind, ComponentKind,
14    EntityAndGlobalEntityConverter, EntityAndLocalEntityConverter, EntityAuthStatus,
15    EntityConverterMut, EntityDoesNotExistError, EntityEventMessage, EntityResponseEvent,
16    FakeEntityConverter, GlobalEntity, GlobalRequestId, GlobalResponseId, GlobalWorldManagerType,
17    Instant, Message, MessageContainer, PacketType, Protocol, RemoteEntity, Replicate,
18    ReplicatedComponent, Request, Response, ResponseReceiveKey, ResponseSendKey, Serde, SerdeErr,
19    SharedGlobalWorldManager, SocketConfig, StandardHeader, SystemChannel, Tick, Timer,
20    WorldMutType, WorldRefType,
21};
22
23use super::{
24    error::NaiaServerError,
25    events::Events,
26    room::{Room, RoomKey, RoomMut, RoomRef},
27    server_config::ServerConfig,
28    user::{User, UserKey, UserMut, UserRef},
29    user_scope::{UserScopeMut, UserScopeRef},
30};
31use crate::{
32    connection::{connection::Connection, io::Io, tick_buffer_messages::TickBufferMessages},
33    handshake::{HandshakeAction, HandshakeManager, Handshaker},
34    request::{GlobalRequestManager, GlobalResponseManager},
35    time_manager::TimeManager,
36    transport::{AuthReceiver, AuthSender, Socket},
37    world::{
38        entity_mut::EntityMut, entity_owner::EntityOwner, entity_ref::EntityRef,
39        entity_room_map::EntityRoomMap, entity_scope_map::EntityScopeMap,
40        global_world_manager::GlobalWorldManager, server_auth_handler::AuthOwner,
41    },
42    ReplicationConfig,
43};
44
45/// A server that uses either UDP or WebRTC communication to send/receive
46/// messages to/from connected clients, and syncs registered entities to
47/// clients to whom they are in-scope
48pub struct Server<E: Copy + Eq + Hash + Send + Sync> {
49    // Config
50    server_config: ServerConfig,
51    protocol: Protocol,
52    io: Io,
53    auth_io: Option<(Box<dyn AuthSender>, Box<dyn AuthReceiver>)>,
54    heartbeat_timer: Timer,
55    timeout_timer: Timer,
56    ping_timer: Timer,
57    handshake_manager: Box<dyn Handshaker>,
58    // Users
59    users: BigMap<UserKey, User>,
60    user_connections: HashMap<SocketAddr, Connection<E>>,
61    // Rooms
62    rooms: BigMap<RoomKey, Room<E>>,
63    // Entities
64    entity_room_map: EntityRoomMap<E>,
65    entity_scope_map: EntityScopeMap<E>,
66    global_world_manager: GlobalWorldManager<E>,
67    // Events
68    incoming_events: Events<E>,
69    // Requests/Responses
70    global_request_manager: GlobalRequestManager,
71    global_response_manager: GlobalResponseManager,
72    // Ticks
73    time_manager: TimeManager,
74}
75
76impl<E: Copy + Eq + Hash + Send + Sync> Server<E> {
77    /// Create a new Server
78    pub fn new<P: Into<Protocol>>(server_config: ServerConfig, protocol: P) -> Self {
79        let mut protocol: Protocol = protocol.into();
80        protocol.lock();
81
82        let time_manager = TimeManager::new(protocol.tick_interval);
83
84        let io = Io::new(
85            &server_config.connection.bandwidth_measure_duration,
86            &protocol.compression,
87        );
88
89        Self {
90            // Config
91            server_config: server_config.clone(),
92            protocol,
93            // Connection
94            io,
95            auth_io: None,
96            heartbeat_timer: Timer::new(server_config.connection.heartbeat_interval),
97            timeout_timer: Timer::new(server_config.connection.disconnection_timeout_duration),
98            ping_timer: Timer::new(server_config.ping.ping_interval),
99            handshake_manager: Box::new(HandshakeManager::new()),
100            // Users
101            users: BigMap::new(),
102            user_connections: HashMap::new(),
103            // Rooms
104            rooms: BigMap::new(),
105            // Entities
106            entity_room_map: EntityRoomMap::new(),
107            entity_scope_map: EntityScopeMap::new(),
108            global_world_manager: GlobalWorldManager::new(),
109            // Events
110            incoming_events: Events::new(),
111            // Requests/Responses
112            global_request_manager: GlobalRequestManager::new(),
113            global_response_manager: GlobalResponseManager::new(),
114            // Ticks
115            time_manager,
116        }
117    }
118
119    /// Listen at the given addresses
120    pub fn listen<S: Into<Box<dyn Socket>>>(&mut self, socket: S) {
121        let boxed_socket: Box<dyn Socket> = socket.into();
122        let (auth_sender, auth_receiver, packet_sender, packet_receiver) = boxed_socket.listen();
123
124        self.io.load(packet_sender, packet_receiver);
125
126        self.auth_io = Some((auth_sender, auth_receiver));
127    }
128
129    /// Returns whether or not the Server has initialized correctly and is
130    /// listening for Clients
131    pub fn is_listening(&self) -> bool {
132        self.io.is_loaded()
133    }
134
135    /// Returns socket config
136    pub fn socket_config(&self) -> &SocketConfig {
137        &self.protocol.socket
138    }
139
140    /// Must be called regularly, maintains connection to and receives messages
141    /// from all Clients
142    pub fn receive<W: WorldMutType<E>>(&mut self, world: W) -> Events<E> {
143        let now = Instant::now();
144
145        // Need to run this to maintain connection with all clients, and receive packets
146        // until none left
147        self.maintain_socket(world, &now);
148
149        // tick event
150        if self.time_manager.recv_server_tick(&now) {
151            self.incoming_events
152                .push_tick(self.time_manager.current_tick());
153        }
154
155        // return all received messages and reset the buffer
156        std::mem::replace(&mut self.incoming_events, Events::<E>::new())
157    }
158
159    // Connections
160
161    /// Accepts an incoming Client User, allowing them to establish a connection
162    /// with the Server
163    pub fn accept_connection(&mut self, user_key: &UserKey) {
164        let Some(user) = self.users.get_mut(user_key) else {
165            warn!("unknown user is finalizing connection...");
166            return;
167        };
168        let auth_addr = user.take_auth_address();
169
170        // info!("adding authenticated user {}", &auth_addr);
171        let identity_token = naia_shared::generate_identity_token();
172        self.handshake_manager
173            .authenticate_user(&identity_token, user_key);
174
175        let (auth_sender, _) = self
176            .auth_io
177            .as_mut()
178            .expect("Auth should be set up by this point");
179        if auth_sender.accept(&auth_addr, &identity_token).is_err() {
180            warn!(
181                "Server Error: Cannot send auth accept packet to {:?}",
182                &auth_addr
183            );
184            // TODO: handle destroying any threads waiting on this response
185            return;
186        }
187    }
188
189    /// Rejects an incoming Client User, terminating their attempt to establish
190    /// a connection with the Server
191    pub fn reject_connection(&mut self, user_key: &UserKey) {
192        if let Some(user) = self.users.get_mut(user_key) {
193            let auth_addr = user.take_auth_address();
194
195            // info!("rejecting authenticated user {:?}", &auth_addr);
196            let (auth_sender, _) = self
197                .auth_io
198                .as_mut()
199                .expect("Auth should be set up by this point");
200            if auth_sender.reject(&auth_addr).is_err() {
201                warn!(
202                    "Server Error: Cannot send auth reject message to {:?}",
203                    &auth_addr
204                );
205                // TODO: handle destroying any threads waiting on this response
206            }
207
208            self.user_delete(user_key);
209        }
210    }
211
212    fn finalize_connection(&mut self, user_key: &UserKey, user_address: &SocketAddr) {
213        let Some(user) = self.users.get_mut(user_key) else {
214            warn!("unknown user is finalizing connection...");
215            return;
216        };
217        user.set_address(user_address);
218        let new_connection = Connection::new(
219            &self.server_config.connection,
220            &self.server_config.ping,
221            &user.address(),
222            user_key,
223            &self.protocol.channel_kinds,
224            &self.global_world_manager,
225        );
226
227        self.user_connections.insert(user.address(), new_connection);
228        if self.io.bandwidth_monitor_enabled() {
229            self.io.register_client(&user.address());
230        }
231        self.incoming_events.push_connection(user_key);
232    }
233
234    // Messages
235
236    /// Queues up an Message to be sent to the Client associated with a given
237    /// UserKey
238    pub fn send_message<C: Channel, M: Message>(&mut self, user_key: &UserKey, message: &M) {
239        let cloned_message = M::clone_box(message);
240        self.send_message_inner(user_key, &ChannelKind::of::<C>(), cloned_message);
241    }
242
243    /// Queues up an Message to be sent to the Client associated with a given
244    /// UserKey
245    fn send_message_inner(
246        &mut self,
247        user_key: &UserKey,
248        channel_kind: &ChannelKind,
249        message_box: Box<dyn Message>,
250    ) {
251        let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
252
253        if !channel_settings.can_send_to_client() {
254            panic!("Cannot send message to Client on this Channel");
255        }
256
257        if let Some(user) = self.users.get(user_key) {
258            if !user.has_address() {
259                return;
260            }
261            if let Some(connection) = self.user_connections.get_mut(&user.address()) {
262                let mut converter = EntityConverterMut::new(
263                    &self.global_world_manager,
264                    &mut connection.base.local_world_manager,
265                );
266                let message = MessageContainer::from_write(message_box, &mut converter);
267                connection.base.message_manager.send_message(
268                    &self.protocol.message_kinds,
269                    &mut converter,
270                    channel_kind,
271                    message,
272                );
273            }
274        }
275    }
276
277    /// Sends a message to all connected users using a given channel
278    pub fn broadcast_message<C: Channel, M: Message>(&mut self, message: &M) {
279        let cloned_message = M::clone_box(message);
280        self.broadcast_message_inner(&ChannelKind::of::<C>(), cloned_message);
281    }
282
283    fn broadcast_message_inner(
284        &mut self,
285        channel_kind: &ChannelKind,
286        message_box: Box<dyn Message>,
287    ) {
288        self.user_keys().iter().for_each(|user_key| {
289            self.send_message_inner(user_key, channel_kind, message_box.clone())
290        })
291    }
292
293    //
294    pub fn send_request<C: Channel, Q: Request>(
295        &mut self,
296        user_key: &UserKey,
297        request: &Q,
298    ) -> Result<ResponseReceiveKey<Q::Response>, NaiaServerError> {
299        let cloned_request = Q::clone_box(request);
300        let id = self.send_request_inner(user_key, &ChannelKind::of::<C>(), cloned_request)?;
301        Ok(ResponseReceiveKey::new(id))
302    }
303
304    fn send_request_inner(
305        &mut self,
306        user_key: &UserKey,
307        channel_kind: &ChannelKind,
308        request_box: Box<dyn Message>,
309    ) -> Result<GlobalRequestId, NaiaServerError> {
310        let channel_settings = self.protocol.channel_kinds.channel(&channel_kind);
311
312        if !channel_settings.can_request_and_respond() {
313            panic!("Requests can only be sent over Bidirectional, Reliable Channels");
314        }
315
316        let request_id = self.global_request_manager.create_request_id(user_key);
317
318        let Some(user) = self.users.get(user_key) else {
319            warn!("user does not exist");
320            return Err(NaiaServerError::Message("user does not exist".to_string()));
321        };
322        if !user.has_address() {
323            warn!("currently not connected to user");
324            return Err(NaiaServerError::Message(
325                "currently not connected to user".to_string(),
326            ));
327        }
328        let Some(connection) = self.user_connections.get_mut(&user.address()) else {
329            warn!("currently not connected to user");
330            return Err(NaiaServerError::Message(
331                "currently not connected to user".to_string(),
332            ));
333        };
334        let mut converter = EntityConverterMut::new(
335            &self.global_world_manager,
336            &mut connection.base.local_world_manager,
337        );
338
339        let message = MessageContainer::from_write(request_box, &mut converter);
340        connection.base.message_manager.send_request(
341            &self.protocol.message_kinds,
342            &mut converter,
343            channel_kind,
344            request_id,
345            message,
346        );
347
348        return Ok(request_id);
349    }
350
351    /// Sends a Response for a given Request. Returns whether or not was successful.
352    pub fn send_response<S: Response>(
353        &mut self,
354        response_key: &ResponseSendKey<S>,
355        response: &S,
356    ) -> bool {
357        let response_id = response_key.response_id();
358
359        let cloned_response = S::clone_box(response);
360
361        self.send_response_inner(&response_id, cloned_response)
362    }
363
364    // returns whether was successful
365    fn send_response_inner(
366        &mut self,
367        response_id: &GlobalResponseId,
368        response_box: Box<dyn Message>,
369    ) -> bool {
370        let Some((user_key, channel_kind, local_response_id)) = self
371            .global_response_manager
372            .destroy_response_id(&response_id)
373        else {
374            return false;
375        };
376        let Some(user) = self.users.get(&user_key) else {
377            return false;
378        };
379        if !user.has_address() {
380            warn!("currently not connected to user");
381            return false;
382        }
383        let Some(connection) = self.user_connections.get_mut(&user.address()) else {
384            return false;
385        };
386        let mut converter = EntityConverterMut::new(
387            &self.global_world_manager,
388            &mut connection.base.local_world_manager,
389        );
390        let response = MessageContainer::from_write(response_box, &mut converter);
391        connection.base.message_manager.send_response(
392            &self.protocol.message_kinds,
393            &mut converter,
394            &channel_kind,
395            local_response_id,
396            response,
397        );
398        return true;
399    }
400
401    pub fn receive_response<S: Response>(
402        &mut self,
403        response_key: &ResponseReceiveKey<S>,
404    ) -> Option<(UserKey, S)> {
405        let request_id = response_key.request_id();
406        let Some((user_key, container)) =
407            self.global_request_manager.destroy_request_id(&request_id)
408        else {
409            return None;
410        };
411        let response: S = Box::<dyn Any + 'static>::downcast::<S>(container.to_boxed_any())
412            .ok()
413            .map(|boxed_s| *boxed_s)
414            .unwrap();
415        return Some((user_key, response));
416    }
417    //
418
419    pub fn receive_tick_buffer_messages(&mut self, tick: &Tick) -> TickBufferMessages {
420        let mut tick_buffer_messages = TickBufferMessages::new();
421        for (_user_address, connection) in self.user_connections.iter_mut() {
422            // receive messages from anyone
423            connection.tick_buffer_messages(tick, &mut tick_buffer_messages);
424        }
425        tick_buffer_messages
426    }
427
428    // Updates
429
430    /// Used to evaluate whether, given a User & Entity that are in the
431    /// same Room, said Entity should be in scope for the given User.
432    ///
433    /// While Rooms allow for a very simple scope to which an Entity can belong,
434    /// this provides complete customization for advanced scopes.
435    ///
436    /// Return a collection of Entity Scope Sets, being a unique combination of
437    /// a related Room, User, and Entity, used to determine which Entities to
438    /// replicate to which Users
439    pub fn scope_checks(&self) -> Vec<(RoomKey, UserKey, E)> {
440        let mut list: Vec<(RoomKey, UserKey, E)> = Vec::new();
441
442        // TODO: precache this, instead of generating a new list every call
443        // likely this is called A LOT
444        for (room_key, room) in self.rooms.iter() {
445            for user_key in room.user_keys() {
446                for entity in room.entities() {
447                    list.push((room_key, *user_key, *entity));
448                }
449            }
450        }
451
452        list
453    }
454
455    /// Sends all update messages to all Clients. If you don't call this
456    /// method, the Server will never communicate with it's connected
457    /// Clients
458    pub fn send_all_updates<W: WorldRefType<E>>(&mut self, world: W) {
459        let now = Instant::now();
460
461        // update entity scopes
462        self.update_entity_scopes(&world);
463
464        // loop through all connections, send packet
465        let mut user_addresses: Vec<SocketAddr> = self.user_connections.keys().copied().collect();
466
467        // shuffle order of connections in order to avoid priority among users
468        fastrand::shuffle(&mut user_addresses);
469
470        for user_address in user_addresses {
471            let connection = self.user_connections.get_mut(&user_address).unwrap();
472
473            connection.send_packets(
474                &self.protocol,
475                &now,
476                &mut self.io,
477                &world,
478                &self.global_world_manager,
479                &self.time_manager,
480            );
481        }
482    }
483
484    // Entities
485
486    /// Creates a new Entity and returns an EntityMut which can be used for
487    /// further operations on the Entity
488    pub fn spawn_entity<W: WorldMutType<E>>(&mut self, mut world: W) -> EntityMut<E, W> {
489        let entity = world.spawn_entity();
490        self.spawn_entity_inner(&entity);
491
492        EntityMut::new(self, world, &entity)
493    }
494
495    /// Creates a new Entity with a specific id
496    fn spawn_entity_inner(&mut self, entity: &E) {
497        self.global_world_manager
498            .spawn_entity_record(entity, EntityOwner::Server);
499    }
500
501    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
502    pub fn enable_entity_replication(&mut self, entity: &E) {
503        self.spawn_entity_inner(&entity);
504    }
505
506    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
507    pub fn disable_entity_replication(&mut self, entity: &E) {
508        // Despawn from connections and inner tracking
509        self.despawn_entity_worldless(entity);
510    }
511
512    pub fn pause_entity_replication(&mut self, entity: &E) {
513        self.global_world_manager.pause_entity_replication(entity);
514    }
515
516    pub fn resume_entity_replication(&mut self, entity: &E) {
517        self.global_world_manager.resume_entity_replication(entity);
518    }
519
520    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
521    pub fn entity_replication_config(&self, entity: &E) -> Option<ReplicationConfig> {
522        self.global_world_manager.entity_replication_config(entity)
523    }
524
525    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
526    pub fn entity_take_authority(&mut self, entity: &E) {
527        let did_change = self.global_world_manager.server_take_authority(entity);
528
529        if did_change {
530            self.send_reset_authority_messages(entity);
531            self.incoming_events.push_auth_reset(entity);
532        }
533    }
534
535    fn send_reset_authority_messages(&mut self, entity: &E) {
536        // authority was released from entity
537        // for any users that have this entity in scope, send an `update_authority_status` message
538
539        // TODO: we can make this more efficient in the future by caching which Entities
540        // are in each User's scope
541        let mut messages_to_send = Vec::new();
542        for (user_key, user) in self.users.iter() {
543            if !user.has_address() {
544                continue;
545            }
546            if let Some(connection) = self.user_connections.get_mut(&user.address()) {
547                if connection.base.host_world_manager.host_has_entity(entity) {
548                    let message = EntityEventMessage::new_update_auth_status(
549                        &self.global_world_manager,
550                        entity,
551                        EntityAuthStatus::Available,
552                    );
553                    messages_to_send.push((user_key, message));
554                }
555
556                // Clean up any remote entity that was mapped to the delegated host entity in this connection!
557                if connection
558                    .base
559                    .local_world_manager
560                    .has_both_host_and_remote_entity(entity)
561                {
562                    Self::remove_redundant_remote_entity_from_host(connection, entity);
563                }
564            }
565        }
566        for (user_key, message) in messages_to_send {
567            self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
568        }
569    }
570
571    pub fn configure_entity_replication<W: WorldMutType<E>>(
572        &mut self,
573        world: &mut W,
574        entity: &E,
575        config: ReplicationConfig,
576    ) {
577        if !self.global_world_manager.has_entity(entity) {
578            panic!("Entity is not yet replicating. Be sure to call `enable_replication` or `spawn_entity` on the Server, before configuring replication.");
579        }
580        let entity_owner = self.global_world_manager.entity_owner(entity).unwrap();
581        let server_owned: bool = entity_owner.is_server();
582        let client_owned: bool = entity_owner.is_client();
583        let next_config = config;
584        let prev_config = self
585            .global_world_manager
586            .entity_replication_config(entity)
587            .unwrap();
588        if prev_config == config {
589            panic!(
590                "Entity replication config is already set to {:?}. Should not set twice.",
591                config
592            );
593        }
594
595        match prev_config {
596            ReplicationConfig::Private => {
597                if server_owned {
598                    panic!("Server-owned entity should never be private");
599                }
600                match next_config {
601                    ReplicationConfig::Private => {
602                        panic!("Should not be able to happen");
603                    }
604                    ReplicationConfig::Public => {
605                        // private -> public
606                        self.publish_entity(world, entity, true);
607                    }
608                    ReplicationConfig::Delegated => {
609                        // private -> delegated
610                        if client_owned {
611                            panic!("Cannot downgrade Client's ownership of Entity to Delegated. Do this Client-side if needed.");
612                            // The reasoning here is that the Client's ownership should be respected.
613                            // Yes the Server typically has authority over all things, but I believe this will enforce better standards.
614                        }
615                        self.publish_entity(world, entity, true);
616                        self.entity_enable_delegation(world, entity, None);
617                    }
618                }
619            }
620            ReplicationConfig::Public => {
621                match next_config {
622                    ReplicationConfig::Private => {
623                        // public -> private
624                        if server_owned {
625                            panic!("Cannot unpublish a Server-owned Entity (doing so would disable replication entirely, just use a local entity instead)");
626                        }
627                        self.unpublish_entity(world, entity, true);
628                    }
629                    ReplicationConfig::Public => {
630                        panic!("Should not be able to happen");
631                    }
632                    ReplicationConfig::Delegated => {
633                        // public -> delegated
634                        if client_owned {
635                            panic!("Cannot downgrade Client's ownership of Entity to Delegated. Do this Client-side if needed.");
636                            // The reasoning here is that the Client's ownership should be respected.
637                            // Yes the Server typically has authority over all things, but I believe this will enforce better standards.
638                        }
639                        self.entity_enable_delegation(world, entity, None);
640                    }
641                }
642            }
643            ReplicationConfig::Delegated => {
644                if client_owned {
645                    panic!("Client-owned entity should never be delegated");
646                }
647                match next_config {
648                    ReplicationConfig::Private => {
649                        // delegated -> private
650                        if server_owned {
651                            panic!("Cannot unpublish a Server-owned Entity (doing so would disable replication entirely, just use a local entity instead)");
652                        }
653                        self.entity_disable_delegation(world, entity);
654                        self.unpublish_entity(world, entity, true);
655                    }
656                    ReplicationConfig::Public => {
657                        // delegated -> public
658                        self.entity_disable_delegation(world, entity);
659                    }
660                    ReplicationConfig::Delegated => {
661                        panic!("Should not be able to happen");
662                    }
663                }
664            }
665        }
666    }
667
668    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
669    pub fn client_request_authority(
670        &mut self,
671        origin_user: &UserKey,
672        world_entity: &E,
673        remote_entity: &RemoteEntity,
674    ) {
675        let requester = AuthOwner::Client(*origin_user);
676        let success = self
677            .global_world_manager
678            .client_request_authority(&world_entity, &requester);
679        if success {
680            // entity authority was granted for origin user
681
682            self.add_redundant_remote_entity_to_host(origin_user, world_entity, remote_entity);
683
684            // for any users that have this entity in scope, send an `update_authority_status` message
685
686            // TODO: we can make this more efficient in the future by caching which Entities
687            // are in each User's scope
688            let mut messages_to_send = Vec::new();
689            for (user_key, user) in self.users.iter() {
690                if !user.has_address() {
691                    continue;
692                }
693                if let Some(connection) = self.user_connections.get(&user.address()) {
694                    if connection
695                        .base
696                        .host_world_manager
697                        .host_has_entity(world_entity)
698                    {
699                        let mut new_status: EntityAuthStatus = EntityAuthStatus::Denied;
700                        if *origin_user == user_key {
701                            new_status = EntityAuthStatus::Granted;
702                        }
703
704                        // if new_status == EntityAuthStatus::Denied {
705                        //     warn!("Denying status of entity to user: `{:?}`", user_key);
706                        // } else {
707                        //     warn!("Granting status of entity to user: `{:?}`", user_key);
708                        // }
709
710                        let message = EntityEventMessage::new_update_auth_status(
711                            &self.global_world_manager,
712                            world_entity,
713                            new_status,
714                        );
715
716                        messages_to_send.push((user_key, message));
717                    }
718                }
719            }
720            for (user_key, message) in messages_to_send {
721                self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
722            }
723
724            self.incoming_events
725                .push_auth_grant(origin_user, &world_entity);
726        } else {
727            panic!("Failed to request authority for entity");
728        }
729    }
730
731    fn entity_enable_delegation_response(&mut self, user_key: &UserKey, entity: &E) {
732        if self.global_world_manager.entity_is_delegated(entity) {
733            let Some(auth_status) = self.global_world_manager.entity_authority_status(entity)
734            else {
735                panic!("Entity should have an Auth status if it is delegated..")
736            };
737            if auth_status != EntityAuthStatus::Available {
738                let message = EntityEventMessage::new_update_auth_status(
739                    &self.global_world_manager,
740                    entity,
741                    auth_status,
742                );
743                self.send_message::<SystemChannel, EntityEventMessage>(user_key, &message);
744            }
745        }
746    }
747
748    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
749    pub fn entity_authority_status(&self, entity: &E) -> Option<EntityAuthStatus> {
750        self.global_world_manager.entity_authority_status(entity)
751    }
752
753    fn add_redundant_remote_entity_to_host(
754        &mut self,
755        user_key: &UserKey,
756        world_entity: &E,
757        remote_entity: &RemoteEntity,
758    ) {
759        if let Some(user) = self.users.get(user_key) {
760            if !user.has_address() {
761                return;
762            }
763            let connection = self.user_connections.get_mut(&user.address()).unwrap();
764
765            // Local World Manager now tracks the Entity by it's Remote Entity
766            connection
767                .base
768                .local_world_manager
769                .insert_remote_entity(world_entity, *remote_entity);
770
771            // Remote world reader needs to track remote entity too
772            let component_kinds = self
773                .global_world_manager
774                .component_kinds(world_entity)
775                .unwrap();
776            connection
777                .base
778                .remote_world_reader
779                .track_hosts_redundant_remote_entity(remote_entity, &component_kinds);
780        }
781    }
782
783    fn remove_redundant_remote_entity_from_host(connection: &mut Connection<E>, world_entity: &E) {
784        let remote_entity = connection
785            .base
786            .local_world_manager
787            .remove_redundant_remote_entity(world_entity);
788        connection
789            .base
790            .remote_world_reader
791            .untrack_hosts_redundant_remote_entity(&remote_entity);
792        connection
793            .base
794            .remote_world_manager
795            .on_entity_channel_closing(&remote_entity);
796    }
797
798    /// This is used only for Hecs/Bevy adapter crates, do not use otherwise!
799    pub fn entity_release_authority(&mut self, origin_user: Option<&UserKey>, entity: &E) {
800        let releaser = AuthOwner::from_user_key(origin_user);
801        let success = self
802            .global_world_manager
803            .client_release_authority(&entity, &releaser);
804        if success {
805            self.send_reset_authority_messages(entity);
806        }
807    }
808
809    /// Retrieves an EntityRef that exposes read-only operations for the
810    /// Entity.
811    /// Panics if the Entity does not exist.
812    pub fn entity<W: WorldRefType<E>>(&self, world: W, entity: &E) -> EntityRef<E, W> {
813        if world.has_entity(entity) {
814            return EntityRef::new(self, world, entity);
815        }
816        panic!("No Entity exists for given Key!");
817    }
818
819    /// Retrieves an EntityMut that exposes read and write operations for the
820    /// Entity.
821    /// Panics if the Entity does not exist.
822    pub fn entity_mut<W: WorldMutType<E>>(&mut self, world: W, entity: &E) -> EntityMut<E, W> {
823        if world.has_entity(entity) {
824            return EntityMut::new(self, world, entity);
825        }
826        panic!("No Entity exists for given Key!");
827    }
828
829    /// Gets a Vec of all Entities in the given World
830    pub fn entities<W: WorldRefType<E>>(&self, world: W) -> Vec<E> {
831        world.entities()
832    }
833
834    pub fn entity_owner(&self, entity: &E) -> EntityOwner {
835        if let Some(owner) = self.global_world_manager.entity_owner(entity) {
836            return owner;
837        }
838        return EntityOwner::Local;
839    }
840
841    // Users
842
843    /// Returns whether or not a User exists for the given RoomKey
844    pub fn user_exists(&self, user_key: &UserKey) -> bool {
845        self.users.contains_key(user_key)
846    }
847
848    /// Retrieves an UserRef that exposes read-only operations for the User
849    /// associated with the given UserKey.
850    /// Panics if the user does not exist.
851    pub fn user(&self, user_key: &UserKey) -> UserRef<E> {
852        if self.users.contains_key(user_key) {
853            return UserRef::new(self, user_key);
854        }
855        panic!("No User exists for given Key!");
856    }
857
858    /// Retrieves an UserMut that exposes read and write operations for the User
859    /// associated with the given UserKey.
860    /// Returns None if the user does not exist.
861    pub fn user_mut(&mut self, user_key: &UserKey) -> UserMut<E> {
862        if self.users.contains_key(user_key) {
863            return UserMut::new(self, user_key);
864        }
865        panic!("No User exists for given Key!");
866    }
867
868    /// Return a list of all currently connected Users' keys
869    pub fn user_keys(&self) -> Vec<UserKey> {
870        let mut output = Vec::new();
871
872        for (user_key, user) in self.users.iter() {
873            if !user.has_address() {
874                continue;
875            }
876            if self.user_connections.contains_key(&user.address()) {
877                output.push(user_key);
878            }
879        }
880
881        output
882    }
883
884    /// Get the number of Users currently connected
885    pub fn users_count(&self) -> usize {
886        self.users.len()
887    }
888
889    /// Returns a UserScopeRef, which is used to query whether a given user has
890    pub fn user_scope(&self, user_key: &UserKey) -> UserScopeRef<E> {
891        if self.users.contains_key(user_key) {
892            return UserScopeRef::new(self, user_key);
893        }
894        panic!("No User exists for given Key!");
895    }
896
897    /// Returns a UserScopeMut, which is used to include/exclude Entities for a
898    /// given User
899    pub fn user_scope_mut(&mut self, user_key: &UserKey) -> UserScopeMut<E> {
900        if self.users.contains_key(user_key) {
901            return UserScopeMut::new(self, user_key);
902        }
903        panic!("No User exists for given Key!");
904    }
905
906    // Rooms
907
908    /// Creates a new Room on the Server and returns a corresponding RoomMut,
909    /// which can be used to add users/entities to the room or retrieve its
910    /// key
911    pub fn make_room(&mut self) -> RoomMut<E> {
912        let new_room = Room::new();
913        let room_key = self.rooms.insert(new_room);
914        RoomMut::new(self, &room_key)
915    }
916
917    /// Returns whether or not a Room exists for the given RoomKey
918    pub fn room_exists(&self, room_key: &RoomKey) -> bool {
919        self.rooms.contains_key(room_key)
920    }
921
922    /// Retrieves an RoomMut that exposes read and write operations for the
923    /// Room associated with the given RoomKey.
924    /// Panics if the room does not exist.
925    pub fn room(&self, room_key: &RoomKey) -> RoomRef<E> {
926        if self.rooms.contains_key(room_key) {
927            return RoomRef::new(self, room_key);
928        }
929        panic!("No Room exists for given Key!");
930    }
931
932    /// Retrieves an RoomMut that exposes read and write operations for the
933    /// Room associated with the given RoomKey.
934    /// Panics if the room does not exist.
935    pub fn room_mut(&mut self, room_key: &RoomKey) -> RoomMut<E> {
936        if self.rooms.contains_key(room_key) {
937            return RoomMut::new(self, room_key);
938        }
939        panic!("No Room exists for given Key!");
940    }
941
942    /// Return a list of all the Server's Rooms' keys
943    pub fn room_keys(&self) -> Vec<RoomKey> {
944        let mut output = Vec::new();
945
946        for (key, _) in self.rooms.iter() {
947            output.push(key);
948        }
949
950        output
951    }
952
953    /// Get a count of how many Rooms currently exist
954    pub fn rooms_count(&self) -> usize {
955        self.rooms.len()
956    }
957
958    // Ticks
959
960    /// Gets the current tick of the Server
961    pub fn current_tick(&self) -> Tick {
962        return self.time_manager.current_tick();
963    }
964
965    /// Gets the current average tick duration of the Server
966    pub fn average_tick_duration(&self) -> Duration {
967        self.time_manager.average_tick_duration()
968    }
969
970    // Bandwidth monitoring
971    pub fn outgoing_bandwidth_total(&mut self) -> f32 {
972        self.io.outgoing_bandwidth_total()
973    }
974
975    pub fn incoming_bandwidth_total(&mut self) -> f32 {
976        self.io.incoming_bandwidth_total()
977    }
978
979    pub fn outgoing_bandwidth_to_client(&mut self, address: &SocketAddr) -> f32 {
980        self.io.outgoing_bandwidth_to_client(address)
981    }
982
983    pub fn incoming_bandwidth_from_client(&mut self, address: &SocketAddr) -> f32 {
984        self.io.incoming_bandwidth_from_client(address)
985    }
986
987    // Ping
988    /// Gets the average Round Trip Time measured to the given User's Client
989    pub fn rtt(&self, user_key: &UserKey) -> Option<f32> {
990        if let Some(user) = self.users.get(user_key) {
991            if !user.has_address() {
992                return None;
993            }
994            if let Some(connection) = self.user_connections.get(&user.address()) {
995                return Some(connection.ping_manager.rtt_average);
996            }
997        }
998        None
999    }
1000
1001    /// Gets the average Jitter measured in connection to the given User's
1002    /// Client
1003    pub fn jitter(&self, user_key: &UserKey) -> Option<f32> {
1004        if let Some(user) = self.users.get(user_key) {
1005            if !user.has_address() {
1006                return None;
1007            }
1008            if let Some(connection) = self.user_connections.get(&user.address()) {
1009                return Some(connection.ping_manager.jitter_average);
1010            }
1011        }
1012        None
1013    }
1014
1015    // Crate-Public methods
1016
1017    //// Entities
1018
1019    /// Despawns the Entity, if it exists.
1020    /// This will also remove all of the Entity’s Components.
1021    /// Panics if the Entity does not exist.
1022    pub(crate) fn despawn_entity<W: WorldMutType<E>>(&mut self, world: &mut W, entity: &E) {
1023        if !world.has_entity(entity) {
1024            panic!("attempted to de-spawn nonexistent entity");
1025        }
1026
1027        // Delete from world
1028        world.despawn_entity(entity);
1029
1030        self.despawn_entity_worldless(entity);
1031    }
1032
1033    pub fn despawn_entity_worldless(&mut self, entity: &E) {
1034        if !self.global_world_manager.has_entity(entity) {
1035            info!("attempting to despawn entity that does not exist, this can happen if a delegated entity is being despawned");
1036            return;
1037        }
1038        self.cleanup_entity_replication(entity);
1039        self.global_world_manager.remove_entity_record(entity);
1040    }
1041
1042    fn cleanup_entity_replication(&mut self, entity: &E) {
1043        self.despawn_entity_from_all_connections(entity);
1044
1045        // Delete scope
1046        self.entity_scope_map.remove_entity(entity);
1047
1048        // Delete room cache entry
1049        if let Some(room_keys) = self.entity_room_map.remove_from_all_rooms(entity) {
1050            for room_key in room_keys {
1051                if let Some(room) = self.rooms.get_mut(&room_key) {
1052                    room.remove_entity(entity, true);
1053                }
1054            }
1055        }
1056
1057        // Remove from ECS Record
1058        self.global_world_manager
1059            .remove_entity_diff_handlers(entity);
1060    }
1061
1062    fn despawn_entity_from_all_connections(&mut self, entity: &E) {
1063        // TODO: we can make this more efficient in the future by caching which Entities
1064        // are in each User's scope
1065        for (_, connection) in self.user_connections.iter_mut() {
1066            if connection.base.host_world_manager.host_has_entity(entity) {
1067                //remove entity from user connection
1068                connection.base.host_world_manager.despawn_entity(entity);
1069            }
1070        }
1071    }
1072
1073    //// Entity Scopes
1074
1075    /// Remove all entities from a User's scope
1076    pub(crate) fn user_scope_remove_user(&mut self, user_key: &UserKey) {
1077        self.entity_scope_map.remove_user(user_key);
1078    }
1079
1080    pub(crate) fn user_scope_set_entity(
1081        &mut self,
1082        user_key: &UserKey,
1083        entity: &E,
1084        is_contained: bool,
1085    ) {
1086        self.entity_scope_map
1087            .insert(*user_key, *entity, is_contained);
1088    }
1089
1090    pub(crate) fn user_scope_has_entity(&self, user_key: &UserKey, entity: &E) -> bool {
1091        if let Some(in_scope) = self.entity_scope_map.get(user_key, entity) {
1092            *in_scope
1093        } else {
1094            false
1095        }
1096    }
1097
1098    //// Components
1099
1100    /// Adds a Component to an Entity
1101    pub(crate) fn insert_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1102        &mut self,
1103        world: &mut W,
1104        entity: &E,
1105        mut component: R,
1106    ) {
1107        if !world.has_entity(entity) {
1108            panic!("attempted to add component to non-existent entity");
1109        }
1110
1111        let component_kind = component.kind();
1112
1113        if world.has_component_of_kind(entity, &component_kind) {
1114            // Entity already has this Component type yet, update Component
1115
1116            let Some(mut component_mut) = world.component_mut::<R>(entity) else {
1117                panic!("Should never happen because we checked for this above");
1118            };
1119            component_mut.mirror(&component);
1120        } else {
1121            // Entity does not have this Component type yet, initialize Component
1122            self.insert_component_worldless(entity, &mut component);
1123
1124            // actually insert component into world
1125            world.insert_component(entity, component);
1126        }
1127    }
1128
1129    // This intended to be used by adapter crates, do not use this as it will not update the world
1130    pub fn insert_component_worldless(&mut self, entity: &E, component: &mut dyn Replicate) {
1131        let component_kind = component.kind();
1132
1133        if self
1134            .global_world_manager
1135            .has_component_record(entity, &component_kind)
1136        {
1137            warn!(
1138                "Attempted to add component `{:?}` to entity that already has it, this can happen if a delegated entity's auth is transferred to the Server before the Server Adapter has been able to process the newly inserted Component. Skipping this action.",
1139                component.name());
1140            return;
1141        }
1142
1143        self.insert_new_component_into_entity_scopes(entity, &component_kind, None);
1144
1145        // update in world manager
1146        self.global_world_manager
1147            .insert_component_record(entity, &component_kind);
1148        self.global_world_manager
1149            .insert_component_diff_handler(entity, component);
1150
1151        // if entity is delegated, convert over
1152        if self.global_world_manager.entity_is_delegated(entity) {
1153            let accessor = self.global_world_manager.get_entity_auth_accessor(entity);
1154            component.enable_delegation(&accessor, None)
1155        }
1156    }
1157
1158    fn insert_new_component_into_entity_scopes(
1159        &mut self,
1160        entity: &E,
1161        component_kind: &ComponentKind,
1162        excluding_user_opt: Option<&UserKey>,
1163    ) {
1164        let excluding_addr_opt: Option<SocketAddr> = {
1165            if let Some(user_key) = excluding_user_opt {
1166                if let Some(user) = self.users.get(user_key) {
1167                    if !user.has_address() {
1168                        None
1169                    } else {
1170                        Some(user.address())
1171                    }
1172                } else {
1173                    None
1174                }
1175            } else {
1176                None
1177            }
1178        };
1179        // add component to connections already tracking entity
1180        for (addr, connection) in self.user_connections.iter_mut() {
1181            if let Some(exclude_addr) = excluding_addr_opt {
1182                if addr == &exclude_addr {
1183                    continue;
1184                }
1185            }
1186
1187            // insert component into user's connection
1188            if connection.base.host_world_manager.host_has_entity(entity) {
1189                connection
1190                    .base
1191                    .host_world_manager
1192                    .insert_component(entity, &component_kind);
1193            }
1194        }
1195    }
1196
1197    /// Removes a Component from an Entity
1198    pub(crate) fn remove_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1199        &mut self,
1200        world: &mut W,
1201        entity: &E,
1202    ) -> Option<R> {
1203        self.remove_component_worldless(entity, &ComponentKind::of::<R>());
1204
1205        // remove from world
1206        world.remove_component::<R>(entity)
1207    }
1208
1209    // This intended to be used by adapter crates, do not use this as it will not update the world
1210    pub fn remove_component_worldless(&mut self, entity: &E, component_kind: &ComponentKind) {
1211        self.remove_component_from_all_connections(entity, component_kind);
1212
1213        // cleanup all other loose ends
1214        self.global_world_manager
1215            .remove_component_record(entity, &component_kind);
1216        self.global_world_manager
1217            .remove_component_diff_handler(entity, &component_kind);
1218    }
1219
1220    fn remove_component_from_all_connections(
1221        &mut self,
1222        entity: &E,
1223        component_kind: &ComponentKind,
1224    ) {
1225        // TODO: should be able to make this more efficient by caching for every Entity
1226        // which scopes they are part of
1227        for (_, connection) in self.user_connections.iter_mut() {
1228            if connection.base.host_world_manager.host_has_entity(entity) {
1229                // remove component from user connection
1230                connection
1231                    .base
1232                    .host_world_manager
1233                    .remove_component(entity, &component_kind);
1234            }
1235        }
1236    }
1237
1238    //// Authority
1239
1240    pub(crate) fn publish_entity<W: WorldMutType<E>>(
1241        &mut self,
1242        world: &mut W,
1243        entity: &E,
1244        server_origin: bool,
1245    ) -> bool {
1246        if server_origin {
1247            // send publish message to entity owner
1248            let entity_owner = self.global_world_manager.entity_owner(&entity);
1249            let Some(EntityOwner::Client(user_key)) = entity_owner else {
1250                panic!(
1251                    "Entity is not owned by a Client. Cannot publish entity. Owner is: {:?}",
1252                    entity_owner
1253                );
1254            };
1255            let message = EntityEventMessage::new_publish(&self.global_world_manager, entity);
1256            self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1257        }
1258
1259        let result = self.global_world_manager.entity_publish(&entity);
1260        if result {
1261            world.entity_publish(&self.global_world_manager, &entity);
1262        }
1263        return result;
1264    }
1265
1266    pub(crate) fn unpublish_entity<W: WorldMutType<E>>(
1267        &mut self,
1268        world: &mut W,
1269        entity: &E,
1270        server_origin: bool,
1271    ) {
1272        if server_origin {
1273            // send publish message to entity owner
1274            let entity_owner = self.global_world_manager.entity_owner(&entity);
1275            let Some(EntityOwner::ClientPublic(user_key)) = entity_owner else {
1276                panic!("Entity is not owned by a Client or is Private. Cannot publish entity. Owner is: {:?}", entity_owner);
1277            };
1278            let message = EntityEventMessage::new_unpublish(&self.global_world_manager, entity);
1279            self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1280        }
1281
1282        world.entity_unpublish(&entity);
1283        self.global_world_manager.entity_unpublish(&entity);
1284        self.cleanup_entity_replication(&entity);
1285    }
1286
1287    pub(crate) fn entity_enable_delegation<W: WorldMutType<E>>(
1288        &mut self,
1289        world: &mut W,
1290        entity: &E,
1291        client_origin: Option<UserKey>,
1292    ) {
1293        // TODO: check that entity is eligible for delegation?
1294
1295        {
1296            // For any users that have this entity in scope,
1297            // Send an `enable_delegation` message
1298
1299            // TODO: we can make this more efficient in the future by caching which Entities
1300            // are in each User's scope
1301            let mut messages_to_send = Vec::new();
1302            for (user_key, user) in self.users.iter() {
1303                if !user.has_address() {
1304                    continue;
1305                }
1306                if let Some(connection) = self.user_connections.get(&user.address()) {
1307                    if connection.base.host_world_manager.host_has_entity(entity) {
1308                        let message = EntityEventMessage::new_enable_delegation(
1309                            &self.global_world_manager,
1310                            entity,
1311                        );
1312                        messages_to_send.push((user_key, message));
1313                    }
1314                }
1315            }
1316            for (user_key, message) in messages_to_send {
1317                self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1318            }
1319        }
1320
1321        if let Some(client_key) = client_origin {
1322            self.enable_delegation_client_owned_entity(world, entity, &client_key);
1323        } else {
1324            self.global_world_manager.entity_enable_delegation(&entity);
1325            world.entity_enable_delegation(&self.global_world_manager, &entity);
1326        }
1327    }
1328
1329    pub(crate) fn enable_delegation_client_owned_entity<W: WorldMutType<E>>(
1330        &mut self,
1331        world: &mut W,
1332        entity: &E,
1333        client_key: &UserKey,
1334    ) {
1335        let Some(entity_owner) = self.global_world_manager.entity_owner(entity) else {
1336            panic!("entity should have an owner at this point");
1337        };
1338        let owner_user_key;
1339        match entity_owner {
1340            EntityOwner::Client(user_key) => {
1341                owner_user_key = user_key;
1342                warn!(
1343                    "entity should be owned by a public client at this point. Owner is: {:?}",
1344                    entity_owner
1345                );
1346
1347                // publishing here to ensure that the entity is in the correct state ..
1348                // TODO: this is probably a bad idea somehow! this is hacky
1349                // instead, should rely on client message coming through at the appropriate time to publish the entity before this..
1350                let result = self.global_world_manager.entity_publish(&entity);
1351                if result {
1352                    world.entity_publish(&self.global_world_manager, &entity);
1353                } else {
1354                    warn!("failed to publish entity before enabling delegation");
1355                    return;
1356                }
1357            }
1358            EntityOwner::ClientPublic(user_key) => {
1359                owner_user_key = user_key;
1360            }
1361            _owner => {
1362                panic!(
1363                    "entity should be owned by a public client at this point. Owner is: {:?}",
1364                    entity_owner
1365                );
1366            }
1367        }
1368        let user_key = owner_user_key;
1369        self.global_world_manager.migrate_entity_to_server(&entity);
1370
1371        // we set this to true immediately since it's already being replicated out to the remote
1372        self.entity_scope_map.insert(user_key, *entity, true);
1373
1374        // Migrate Entity from Remote -> Host connection
1375        let Some(user) = self.users.get(&user_key) else {
1376            panic!("user should exist");
1377        };
1378        if !user.has_address() {
1379            panic!("user should have an address");
1380        }
1381        let Some(connection) = self.user_connections.get_mut(&user.address()) else {
1382            panic!("connection does not exist")
1383        };
1384        let component_kinds = self.global_world_manager.component_kinds(entity).unwrap();
1385
1386        // Add remote entity to Host World
1387        let new_host_entity = connection.base.host_world_manager.track_remote_entity(
1388            &mut connection.base.local_world_manager,
1389            entity,
1390            component_kinds,
1391        );
1392
1393        // send response
1394        let message = EntityEventMessage::new_entity_migrate_response(
1395            &self.global_world_manager,
1396            &entity,
1397            new_host_entity,
1398        );
1399        self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1400
1401        self.global_world_manager.entity_enable_delegation(&entity);
1402        world.entity_enable_delegation(&self.global_world_manager, &entity);
1403
1404        // grant authority to user
1405        let requester = AuthOwner::from_user_key(Some(client_key));
1406        let success = self
1407            .global_world_manager
1408            .client_request_authority(&entity, &requester);
1409        if !success {
1410            panic!("failed to grant authority of client-owned delegated entity to creating user");
1411        }
1412    }
1413
1414    pub(crate) fn entity_disable_delegation<W: WorldMutType<E>>(
1415        &mut self,
1416        world: &mut W,
1417        entity: &E,
1418    ) {
1419        // TODO: check that entity is eligible for delegation?
1420        info!("server.entity_disable_delegation");
1421        // for any users that have this entity in scope, send an `disable_delegation` message
1422        {
1423            // TODO: we can make this more efficient in the future by caching which Entities
1424            // are in each User's scope
1425            let mut messages_to_send = Vec::new();
1426            for (user_key, user) in self.users.iter() {
1427                if !user.has_address() {
1428                    continue;
1429                }
1430                let connection = self.user_connections.get(&user.address()).unwrap();
1431                if connection.base.host_world_manager.host_has_entity(entity) {
1432                    let message = EntityEventMessage::new_disable_delegation(
1433                        &self.global_world_manager,
1434                        entity,
1435                    );
1436                    messages_to_send.push((user_key, message));
1437                }
1438            }
1439            for (user_key, message) in messages_to_send {
1440                self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1441            }
1442        }
1443
1444        self.global_world_manager.entity_disable_delegation(&entity);
1445        world.entity_disable_delegation(&entity);
1446    }
1447
1448    //// Users
1449
1450    /// Get a User's Socket Address, given the associated UserKey
1451    pub(crate) fn user_address(&self, user_key: &UserKey) -> Option<SocketAddr> {
1452        if let Some(user) = self.users.get(user_key) {
1453            if user.has_address() {
1454                return Some(user.address());
1455            }
1456        }
1457        None
1458    }
1459
1460    /// Returns an iterator of all the keys of the [`Room`]s the User belongs to
1461    pub(crate) fn user_room_keys(&self, user_key: &UserKey) -> Option<Iter<RoomKey>> {
1462        if let Some(user) = self.users.get(user_key) {
1463            return Some(user.room_keys().iter());
1464        }
1465        return None;
1466    }
1467
1468    /// Get an count of how many Rooms the given User is inside
1469    pub(crate) fn user_rooms_count(&self, user_key: &UserKey) -> Option<usize> {
1470        if let Some(user) = self.users.get(user_key) {
1471            return Some(user.room_count());
1472        }
1473        return None;
1474    }
1475
1476    pub(crate) fn user_disconnect<W: WorldMutType<E>>(
1477        &mut self,
1478        user_key: &UserKey,
1479        world: &mut W,
1480    ) {
1481        if self.protocol.client_authoritative_entities {
1482            self.despawn_all_remote_entities(user_key, world);
1483            if let Some(all_owned_entities) =
1484                self.global_world_manager.user_all_owned_entities(user_key)
1485            {
1486                let copied_entities = all_owned_entities.clone();
1487                for entity in copied_entities {
1488                    self.entity_release_authority(Some(user_key), &entity);
1489                }
1490            }
1491        }
1492        let user = self.user_delete(user_key);
1493        self.incoming_events.push_disconnection(user_key, user);
1494    }
1495
1496    pub(crate) fn user_queue_disconnect(&mut self, user_key: &UserKey) {
1497        let Some(user) = self.users.get(user_key) else {
1498            panic!("Attempting to disconnect a nonexistent user");
1499        };
1500        if !user.has_address() {
1501            panic!("Attempting to disconnect a nonexistent connection");
1502        }
1503        let Some(connection) = self.user_connections.get_mut(&user.address()) else {
1504            panic!("Attempting to disconnect a nonexistent connection");
1505        };
1506        connection.manual_disconnect = true;
1507    }
1508
1509    /// All necessary cleanup, when they're actually gone...
1510    pub(crate) fn despawn_all_remote_entities<W: WorldMutType<E>>(
1511        &mut self,
1512        user_key: &UserKey,
1513        world: &mut W,
1514    ) {
1515        let Some(user) = self.users.get(user_key) else {
1516            panic!("Attempting to despawn entities for a nonexistent user");
1517        };
1518        if !user.has_address() {
1519            return;
1520        }
1521        let Some(connection) = self.user_connections.get_mut(&user.address()) else {
1522            panic!("Attempting to despawn entities on a nonexistent connection");
1523        };
1524
1525        let remote_entities = connection.base.remote_entities();
1526        let entity_events = SharedGlobalWorldManager::<E>::despawn_all_entities(
1527            world,
1528            &self.global_world_manager,
1529            remote_entities,
1530        );
1531        let response_events = self
1532            .incoming_events
1533            .receive_entity_events(user_key, entity_events);
1534        self.process_response_events(world, user_key, response_events);
1535    }
1536
1537    pub(crate) fn user_delete(&mut self, user_key: &UserKey) -> User {
1538        let Some(user) = self.users.remove(user_key) else {
1539            panic!("Attempting to delete non-existant user!");
1540        };
1541
1542        if let Some(user_addr) = user.address_opt() {
1543            info!("deleting authenticated user for {}", user.address());
1544            self.user_connections.remove(&user_addr);
1545        }
1546
1547        self.entity_scope_map.remove_user(user_key);
1548
1549        self.handshake_manager
1550            .delete_user(user_key, user.address_opt());
1551
1552        // Clean up all user data
1553        for room_key in user.room_keys() {
1554            self.rooms
1555                .get_mut(room_key)
1556                .unwrap()
1557                .unsubscribe_user(user_key);
1558        }
1559
1560        // remove from bandwidth monitor
1561        if self.io.bandwidth_monitor_enabled() {
1562            self.io.deregister_client(&user.address());
1563        }
1564
1565        return user;
1566    }
1567
1568    //// Rooms
1569
1570    /// Deletes the Room associated with a given RoomKey on the Server.
1571    /// Returns true if the Room existed.
1572    pub(crate) fn room_destroy(&mut self, room_key: &RoomKey) -> bool {
1573        self.room_remove_all_entities(room_key);
1574
1575        if self.rooms.contains_key(room_key) {
1576            // TODO: what else kind of cleanup do we need to do here? Scopes?
1577
1578            // actually remove the room from the collection
1579            let room = self.rooms.remove(room_key).unwrap();
1580            for user_key in room.user_keys() {
1581                self.users.get_mut(user_key).unwrap().uncache_room(room_key);
1582            }
1583
1584            true
1585        } else {
1586            false
1587        }
1588    }
1589
1590    //////// users
1591
1592    /// Returns whether or not an User is currently in a specific Room, given
1593    /// their keys.
1594    pub(crate) fn room_has_user(&self, room_key: &RoomKey, user_key: &UserKey) -> bool {
1595        if let Some(room) = self.rooms.get(room_key) {
1596            return room.has_user(user_key);
1597        }
1598        false
1599    }
1600
1601    /// Add an User to a Room, given the appropriate RoomKey & UserKey
1602    /// Entities will only ever be in-scope for Users which are in a
1603    /// Room with them
1604    pub(crate) fn room_add_user(&mut self, room_key: &RoomKey, user_key: &UserKey) {
1605        if let Some(user) = self.users.get_mut(user_key) {
1606            if let Some(room) = self.rooms.get_mut(room_key) {
1607                room.subscribe_user(user_key);
1608                user.cache_room(room_key);
1609            }
1610        }
1611    }
1612
1613    /// Removes a User from a Room
1614    pub(crate) fn room_remove_user(&mut self, room_key: &RoomKey, user_key: &UserKey) {
1615        if let Some(user) = self.users.get_mut(user_key) {
1616            if let Some(room) = self.rooms.get_mut(room_key) {
1617                room.unsubscribe_user(user_key);
1618                user.uncache_room(room_key);
1619            }
1620        }
1621    }
1622
1623    /// Get a count of Users in a given Room
1624    pub(crate) fn room_users_count(&self, room_key: &RoomKey) -> usize {
1625        if let Some(room) = self.rooms.get(room_key) {
1626            return room.users_count();
1627        }
1628        0
1629    }
1630
1631    /// Returns an iterator of the [`UserKey`] for Users that belong in the Room
1632    pub(crate) fn room_user_keys(&self, room_key: &RoomKey) -> impl Iterator<Item = &UserKey> {
1633        let iter = if let Some(room) = self.rooms.get(room_key) {
1634            Some(room.user_keys())
1635        } else {
1636            None
1637        };
1638        iter.into_iter().flatten()
1639    }
1640
1641    pub(crate) fn room_entities(&self, room_key: &RoomKey) -> impl Iterator<Item = &E> {
1642        let iter = if let Some(room) = self.rooms.get(room_key) {
1643            Some(room.entities())
1644        } else {
1645            None
1646        };
1647        iter.into_iter().flatten()
1648    }
1649
1650    /// Sends a message to all connected users in a given Room using a given channel
1651    pub(crate) fn room_broadcast_message(
1652        &mut self,
1653        channel_kind: &ChannelKind,
1654        room_key: &RoomKey,
1655        message_box: Box<dyn Message>,
1656    ) {
1657        if let Some(room) = self.rooms.get(room_key) {
1658            let user_keys: Vec<UserKey> = room.user_keys().cloned().collect();
1659            for user_key in &user_keys {
1660                self.send_message_inner(user_key, channel_kind, message_box.clone())
1661            }
1662        }
1663    }
1664
1665    //////// entities
1666
1667    /// Returns whether or not an Entity is currently in a specific Room, given
1668    /// their keys.
1669    pub(crate) fn room_has_entity(&self, room_key: &RoomKey, entity: &E) -> bool {
1670        let Some(room) = self.rooms.get(room_key) else {
1671            return false;
1672        };
1673        return room.has_entity(entity);
1674    }
1675
1676    /// Add an Entity to a Room associated with the given RoomKey.
1677    /// Entities will only ever be in-scope for Users which are in a Room with
1678    /// them.
1679    pub(crate) fn room_add_entity(&mut self, room_key: &RoomKey, entity: &E) {
1680        let mut is_some = false;
1681        if let Some(room) = self.rooms.get_mut(room_key) {
1682            room.add_entity(entity);
1683            is_some = true;
1684        }
1685        if !is_some {
1686            return;
1687        }
1688        self.entity_room_map.entity_add_room(entity, room_key);
1689    }
1690
1691    /// Remove an Entity from a Room, associated with the given RoomKey
1692    pub(crate) fn room_remove_entity(&mut self, room_key: &RoomKey, entity: &E) {
1693        if let Some(room) = self.rooms.get_mut(room_key) {
1694            room.remove_entity(entity, false);
1695            self.entity_room_map.remove_from_room(entity, room_key);
1696        }
1697    }
1698
1699    /// Remove all Entities from a Room, associated with the given RoomKey
1700    fn room_remove_all_entities(&mut self, room_key: &RoomKey) {
1701        if let Some(room) = self.rooms.get_mut(room_key) {
1702            let entities: Vec<E> = room.entities().copied().collect();
1703            for entity in entities {
1704                room.remove_entity(&entity, false);
1705                self.entity_room_map.remove_from_room(&entity, room_key);
1706            }
1707        }
1708    }
1709
1710    /// Get a count of Entities in a given Room
1711    pub(crate) fn room_entities_count(&self, room_key: &RoomKey) -> usize {
1712        if let Some(room) = self.rooms.get(room_key) {
1713            return room.entities_count();
1714        }
1715        0
1716    }
1717
1718    // Private methods
1719
1720    /// Maintain connection with a client and read all incoming packet data
1721    fn maintain_socket<W: WorldMutType<E>>(&mut self, mut world: W, now: &Instant) {
1722        self.handle_disconnects(&mut world);
1723        self.handle_heartbeats();
1724        self.handle_pings();
1725        self.handle_empty_acks();
1726
1727        let mut addresses: HashSet<SocketAddr> = HashSet::new();
1728
1729        // receive auth events
1730        if let Some((_, auth_receiver)) = self.auth_io.as_mut() {
1731            loop {
1732                match auth_receiver.receive() {
1733                    Ok(Some((auth_addr, auth_bytes))) => {
1734                        // create new user
1735                        let user_key = self.users.insert(User::new(auth_addr));
1736
1737                        // convert bytes into auth object
1738                        let mut reader = BitReader::new(auth_bytes);
1739                        let Ok(auth_message) = self
1740                            .protocol
1741                            .message_kinds
1742                            .read(&mut reader, &FakeEntityConverter)
1743                        else {
1744                            warn!("Server Error: cannot read auth message");
1745                            continue;
1746                        };
1747
1748                        // send out event
1749                        self.incoming_events.push_auth(&user_key, auth_message);
1750                    }
1751                    Ok(None) => {
1752                        // No more auths, break loop
1753                        break;
1754                    }
1755                    Err(_) => {
1756                        self.incoming_events.push_error(NaiaServerError::RecvError);
1757                    }
1758                }
1759            }
1760        }
1761
1762        // receive socket events
1763        loop {
1764            match self.io.recv_reader() {
1765                Ok(Some((address, owned_reader))) => {
1766                    // receive packet
1767                    let mut reader = owned_reader.borrow();
1768
1769                    // read header
1770                    let Ok(header) = StandardHeader::de(&mut reader) else {
1771                        // Received a malformed packet
1772                        // TODO: increase suspicion against packet sender
1773                        continue;
1774                    };
1775
1776                    match header.packet_type {
1777                        PacketType::Data => {
1778                            addresses.insert(address);
1779
1780                            if self
1781                                .read_data_packet(&address, &header, &mut reader)
1782                                .is_err()
1783                            {
1784                                warn!("Server Error: cannot read malformed packet");
1785                                continue;
1786                            }
1787                        }
1788                        PacketType::Ping => {
1789                            let response = self.time_manager.process_ping(&mut reader).unwrap();
1790                            // send packet
1791                            if self.io.send_packet(&address, response.to_packet()).is_err() {
1792                                // TODO: pass this on and handle above
1793                                warn!("Server Error: Cannot send pong packet to {}", address);
1794                                continue;
1795                            };
1796
1797                            if let Some(connection) = self.user_connections.get_mut(&address) {
1798                                connection.process_incoming_header(&header);
1799                                connection.base.mark_heard();
1800                            }
1801
1802                            continue;
1803                        }
1804                        PacketType::Heartbeat => {
1805                            if let Some(connection) = self.user_connections.get_mut(&address) {
1806                                connection.process_incoming_header(&header);
1807                                connection.base.mark_heard();
1808                            }
1809
1810                            continue;
1811                        }
1812                        PacketType::Pong => {
1813                            if let Some(connection) = self.user_connections.get_mut(&address) {
1814                                connection.process_incoming_header(&header);
1815                                connection.base.mark_heard();
1816                                connection
1817                                    .ping_manager
1818                                    .process_pong(&self.time_manager, &mut reader);
1819                            }
1820
1821                            continue;
1822                        }
1823                        PacketType::Handshake => {
1824                            match self.handshake_manager.maintain_handshake(
1825                                &address,
1826                                &mut reader,
1827                                self.user_connections.contains_key(&address),
1828                            ) {
1829                                Ok(HandshakeAction::None) => {}
1830                                Ok(HandshakeAction::FinalizeConnection(
1831                                    user_key,
1832                                    validate_packet,
1833                                )) => {
1834                                    self.finalize_connection(&user_key, &address);
1835                                    if self.io.send_packet(&address, validate_packet).is_err() {
1836                                        // TODO: pass this on and handle above
1837                                        warn!(
1838                                            "Server Error: Cannot send validation packet to {}",
1839                                            &address
1840                                        );
1841                                    }
1842                                }
1843                                Ok(HandshakeAction::SendPacket(packet)) => {
1844                                    if self.io.send_packet(&address, packet).is_err() {
1845                                        // TODO: pass this on and handle above
1846                                        warn!("Server Error: Cannot send packet to {}", &address);
1847                                    }
1848                                }
1849                                Ok(HandshakeAction::DisconnectUser(user_key)) => {
1850                                    self.user_disconnect(&user_key, &mut world);
1851                                }
1852                                Err(_err) => {
1853                                    warn!("Server Error: cannot read malformed packet");
1854                                }
1855                            }
1856                        }
1857                    }
1858                }
1859                Ok(None) => {
1860                    // No more packets, break loop
1861                    break;
1862                }
1863                Err(error) => {
1864                    self.incoming_events
1865                        .push_error(NaiaServerError::Wrapped(Box::new(error)));
1866                }
1867            }
1868        }
1869
1870        for address in addresses {
1871            self.process_packets(&address, &mut world, now);
1872        }
1873    }
1874
1875    fn read_data_packet(
1876        &mut self,
1877        address: &SocketAddr,
1878        header: &StandardHeader,
1879        reader: &mut BitReader,
1880    ) -> Result<(), SerdeErr> {
1881        if header.packet_type != PacketType::Data {
1882            panic!("Server Error: received non-data packet in data packet handler");
1883        }
1884
1885        // Packets requiring established connection
1886        let Some(connection) = self.user_connections.get_mut(address) else {
1887            return Ok(());
1888        };
1889
1890        // Mark that we've heard from the client
1891        connection.base.mark_heard();
1892
1893        // Mark that we've received a data packet, to queue an empty ack
1894        connection.base.mark_should_send_empty_ack();
1895
1896        // Process incoming header
1897        connection.process_incoming_header(header);
1898
1899        // read client tick
1900        let client_tick = Tick::de(reader)?;
1901
1902        let server_tick = self.time_manager.current_tick();
1903
1904        // process data
1905        connection.read_packet(
1906            &self.protocol,
1907            server_tick,
1908            client_tick,
1909            reader,
1910            &mut self.global_world_manager,
1911        )?;
1912
1913        return Ok(());
1914    }
1915
1916    fn process_packets<W: WorldMutType<E>>(
1917        &mut self,
1918        address: &SocketAddr,
1919        world: &mut W,
1920        now: &Instant,
1921    ) {
1922        // Packets requiring established connection
1923        let (user_key, response_events) = {
1924            let Some(connection) = self.user_connections.get_mut(address) else {
1925                return;
1926            };
1927            (
1928                connection.user_key,
1929                connection.process_packets(
1930                    &self.protocol,
1931                    now,
1932                    &mut self.global_world_manager,
1933                    &mut self.global_request_manager,
1934                    &mut self.global_response_manager,
1935                    world,
1936                    &mut self.incoming_events,
1937                ),
1938            )
1939        };
1940        self.process_response_events(world, &user_key, response_events);
1941    }
1942
1943    fn process_response_events<W: WorldMutType<E>>(
1944        &mut self,
1945        world: &mut W,
1946        user_key: &UserKey,
1947        response_events: Vec<EntityResponseEvent<E>>,
1948    ) {
1949        let mut deferred_events = Vec::new();
1950        for response_event in response_events {
1951            match response_event {
1952                EntityResponseEvent::SpawnEntity(entity) => {
1953                    self.global_world_manager
1954                        .spawn_entity_record(&entity, EntityOwner::Client(*user_key));
1955                    let user = self.users.get(user_key).unwrap();
1956                    if !user.has_address() {
1957                        continue;
1958                    }
1959                    let connection = self.user_connections.get_mut(&user.address()).unwrap();
1960                    let local_entity = connection
1961                        .base
1962                        .local_world_manager
1963                        .entity_to_remote_entity(&entity)
1964                        .unwrap();
1965                    connection
1966                        .base
1967                        .remote_world_manager
1968                        .on_entity_channel_opened(&local_entity);
1969                }
1970                EntityResponseEvent::InsertComponent(entity, component_kind) => {
1971                    self.global_world_manager
1972                        .insert_component_record(&entity, &component_kind);
1973                    if self
1974                        .global_world_manager
1975                        .entity_is_public_and_client_owned(&entity)
1976                        || self.global_world_manager.entity_is_delegated(&entity)
1977                    {
1978                        world.component_publish(
1979                            &self.global_world_manager,
1980                            &entity,
1981                            &component_kind,
1982                        );
1983
1984                        if self.global_world_manager.entity_is_delegated(&entity) {
1985                            world.component_enable_delegation(
1986                                &self.global_world_manager,
1987                                &entity,
1988                                &component_kind,
1989                            );
1990
1991                            // track remote component on the originating connection for the time being
1992                            let user = self.users.get(user_key).unwrap();
1993                            if !user.has_address() {
1994                                continue;
1995                            }
1996                            let addr = user.address();
1997                            let connection = self.user_connections.get_mut(&addr).unwrap();
1998                            connection
1999                                .base
2000                                .host_world_manager
2001                                .track_remote_component(&entity, &component_kind);
2002                        }
2003
2004                        self.insert_new_component_into_entity_scopes(
2005                            &entity,
2006                            &component_kind,
2007                            Some(user_key),
2008                        );
2009                    }
2010                }
2011                EntityResponseEvent::RemoveComponent(entity, component_kind) => {
2012                    if self
2013                        .global_world_manager
2014                        .entity_is_public_and_client_owned(&entity)
2015                        || self.global_world_manager.entity_is_delegated(&entity)
2016                    {
2017                        self.remove_component_worldless(&entity, &component_kind);
2018                    } else {
2019                        self.global_world_manager
2020                            .remove_component_record(&entity, &component_kind);
2021                    }
2022                }
2023                _ => {
2024                    deferred_events.push(response_event);
2025                }
2026            }
2027        }
2028
2029        let mut extra_deferred_events = Vec::new();
2030        // The reason for deferring these events is that they depend on the operations to the world above
2031        for response_event in deferred_events {
2032            match response_event {
2033                EntityResponseEvent::PublishEntity(entity) => {
2034                    info!("received publish entity message!");
2035                    self.publish_entity(world, &entity, false);
2036                    self.incoming_events.push_publish(user_key, &entity);
2037                }
2038                EntityResponseEvent::UnpublishEntity(entity) => {
2039                    self.unpublish_entity(world, &entity, false);
2040                    self.incoming_events.push_unpublish(user_key, &entity);
2041                }
2042                EntityResponseEvent::EnableDelegationEntity(entity) => {
2043                    info!("received enable delegation entity message!");
2044                    self.entity_enable_delegation(world, &entity, Some(*user_key));
2045                    self.incoming_events.push_delegate(user_key, &entity);
2046                }
2047                EntityResponseEvent::EnableDelegationEntityResponse(entity) => {
2048                    self.entity_enable_delegation_response(user_key, &entity);
2049                }
2050                EntityResponseEvent::DisableDelegationEntity(_) => {
2051                    panic!("Clients should not be able to disable entity delegation.");
2052                }
2053                EntityResponseEvent::EntityRequestAuthority(world_entity, remote_entity) => {
2054                    self.client_request_authority(user_key, &world_entity, &remote_entity);
2055                }
2056                EntityResponseEvent::EntityReleaseAuthority(entity) => {
2057                    // info!("received release auth entity message!");
2058                    self.entity_release_authority(Some(user_key), &entity);
2059                    self.incoming_events.push_auth_reset(&entity);
2060                }
2061                EntityResponseEvent::EntityUpdateAuthority(_, _) => {
2062                    panic!("Clients should not be able to update entity authority.");
2063                }
2064                EntityResponseEvent::EntityMigrateResponse(_, _) => {
2065                    panic!("Clients should not be able to send this message");
2066                }
2067                _ => {
2068                    extra_deferred_events.push(response_event);
2069                }
2070            }
2071        }
2072
2073        for response_event in extra_deferred_events {
2074            match response_event {
2075                EntityResponseEvent::DespawnEntity(entity) => {
2076                    if self
2077                        .global_world_manager
2078                        .entity_is_public_and_client_owned(&entity)
2079                        || self.global_world_manager.entity_is_delegated(&entity)
2080                    {
2081                        // remove from host connection
2082                        let user = self.users.get(user_key).unwrap();
2083                        if !user.has_address() {
2084                            continue;
2085                        }
2086                        let connection = self.user_connections.get_mut(&user.address()).unwrap();
2087                        connection
2088                            .base
2089                            .host_world_manager
2090                            .client_initiated_despawn(&entity);
2091
2092                        self.despawn_entity_worldless(&entity);
2093                    } else {
2094                        self.global_world_manager.remove_entity_record(&entity);
2095                    }
2096                }
2097                _ => {
2098                    panic!("shouldn't happen");
2099                }
2100            }
2101        }
2102    }
2103
2104    fn handle_disconnects<W: WorldMutType<E>>(&mut self, world: &mut W) {
2105        // disconnects
2106        if self.timeout_timer.ringing() {
2107            self.timeout_timer.reset();
2108
2109            let mut user_disconnects: Vec<UserKey> = Vec::new();
2110
2111            for (_, connection) in &mut self.user_connections.iter_mut() {
2112                // user disconnects
2113                if connection.base.should_drop() || connection.manual_disconnect {
2114                    user_disconnects.push(connection.user_key);
2115                    continue;
2116                }
2117            }
2118
2119            for user_key in user_disconnects {
2120                self.user_disconnect(&user_key, world);
2121            }
2122        }
2123    }
2124
2125    fn handle_heartbeats(&mut self) {
2126        // heartbeats
2127        if self.heartbeat_timer.ringing() {
2128            self.heartbeat_timer.reset();
2129
2130            for (user_address, connection) in &mut self.user_connections.iter_mut() {
2131                // user heartbeats
2132                if connection.base.should_send_heartbeat() {
2133                    Self::send_heartbeat_packet(
2134                        user_address,
2135                        connection,
2136                        &self.time_manager,
2137                        &mut self.io,
2138                    );
2139                }
2140            }
2141        }
2142    }
2143
2144    fn handle_empty_acks(&mut self) {
2145        // empty acks
2146
2147        for (user_address, connection) in &mut self.user_connections.iter_mut() {
2148            if connection.base.should_send_empty_ack() {
2149                Self::send_heartbeat_packet(
2150                    user_address,
2151                    connection,
2152                    &self.time_manager,
2153                    &mut self.io,
2154                );
2155            }
2156        }
2157    }
2158
2159    fn send_heartbeat_packet(
2160        user_address: &SocketAddr,
2161        connection: &mut Connection<E>,
2162        time_manager: &TimeManager,
2163        io: &mut Io,
2164    ) {
2165        // Don't try to refactor this to self.internal_send, doesn't seem to
2166        // work cause of iter_mut()
2167        let mut writer = BitWriter::new();
2168
2169        // write header
2170        connection
2171            .base
2172            .write_header(PacketType::Heartbeat, &mut writer);
2173
2174        // write server tick
2175        time_manager.current_tick().ser(&mut writer);
2176
2177        // write server tick instant
2178        time_manager.current_tick_instant().ser(&mut writer);
2179
2180        // send packet
2181        if io.send_packet(user_address, writer.to_packet()).is_err() {
2182            // TODO: pass this on and handle above
2183            warn!(
2184                "Server Error: Cannot send heartbeat packet to {}",
2185                user_address
2186            );
2187        }
2188        connection.base.mark_sent();
2189    }
2190
2191    fn handle_pings(&mut self) {
2192        // pings
2193        if self.ping_timer.ringing() {
2194            self.ping_timer.reset();
2195
2196            for (user_address, connection) in &mut self.user_connections.iter_mut() {
2197                // send pings
2198                if connection.ping_manager.should_send_ping() {
2199                    let mut writer = BitWriter::new();
2200
2201                    // write header
2202                    connection.base.write_header(PacketType::Ping, &mut writer);
2203
2204                    // write server tick
2205                    self.time_manager.current_tick().ser(&mut writer);
2206
2207                    // write server tick instant
2208                    self.time_manager.current_tick_instant().ser(&mut writer);
2209
2210                    // write body
2211                    connection
2212                        .ping_manager
2213                        .write_ping(&mut writer, &self.time_manager);
2214
2215                    // send packet
2216                    if self
2217                        .io
2218                        .send_packet(user_address, writer.to_packet())
2219                        .is_err()
2220                    {
2221                        // TODO: pass this on and handle above
2222                        warn!("Server Error: Cannot send ping packet to {}", user_address);
2223                    }
2224                    connection.base.mark_sent();
2225                }
2226            }
2227        }
2228    }
2229
2230    // Entity Scopes
2231
2232    fn update_entity_scopes<W: WorldRefType<E>>(&mut self, world: &W) {
2233        for (_, room) in self.rooms.iter_mut() {
2234            while let Some((removed_user, removed_entity)) = room.pop_entity_removal_queue() {
2235                let Some(user) = self.users.get(&removed_user) else {
2236                    continue;
2237                };
2238                if !user.has_address() {
2239                    continue;
2240                }
2241                let Some(connection) = self.user_connections.get_mut(&user.address()) else {
2242                    continue;
2243                };
2244
2245                // evaluate whether the Entity really needs to be despawned!
2246                // what if the Entity shares another Room with this User? It shouldn't be despawned!
2247                if let Some(entity_rooms) = self.entity_room_map.entity_get_rooms(&removed_entity) {
2248                    let user_rooms = user.room_keys();
2249                    let has_room_in_common = entity_rooms.intersection(user_rooms).next().is_some();
2250                    if has_room_in_common {
2251                        continue;
2252                    }
2253                }
2254
2255                // check if host has entity, because it may have been removed from room before despawning, and we don't want to double despawn
2256                if connection
2257                    .base
2258                    .host_world_manager
2259                    .host_has_entity(&removed_entity)
2260                {
2261                    //remove entity from user connection
2262                    connection
2263                        .base
2264                        .host_world_manager
2265                        .despawn_entity(&removed_entity);
2266                }
2267            }
2268        }
2269
2270        for (_, room) in self.rooms.iter_mut() {
2271            // TODO: we should be able to cache these tuples of keys to avoid building a new
2272            // list each time
2273            for user_key in room.user_keys() {
2274                let Some(user) = self.users.get(user_key) else {
2275                    continue;
2276                };
2277                if !user.has_address() {
2278                    continue;
2279                }
2280                let Some(connection) = self.user_connections.get_mut(&user.address()) else {
2281                    continue;
2282                };
2283                for entity in room.entities() {
2284                    if !world.has_entity(entity) {
2285                        continue;
2286                    }
2287                    if self
2288                        .global_world_manager
2289                        .entity_is_public_and_owned_by_user(user_key, entity)
2290                    {
2291                        // entity is owned by client, but it is public, so we don't need to replicate it
2292                        continue;
2293                    }
2294
2295                    let currently_in_scope =
2296                        connection.base.host_world_manager.host_has_entity(entity);
2297
2298                    let should_be_in_scope =
2299                        if let Some(in_scope) = self.entity_scope_map.get(user_key, entity) {
2300                            *in_scope
2301                        } else {
2302                            false
2303                        };
2304
2305                    if should_be_in_scope {
2306                        if currently_in_scope {
2307                            continue;
2308                        }
2309                        let component_kinds =
2310                            self.global_world_manager.component_kinds(entity).unwrap();
2311                        // add entity & components to the connections local scope
2312                        connection.base.host_world_manager.init_entity(
2313                            &mut connection.base.local_world_manager,
2314                            entity,
2315                            component_kinds,
2316                        );
2317
2318                        // if entity is delegated, send message to connection
2319                        if !self.global_world_manager.entity_is_delegated(entity) {
2320                            continue;
2321                        }
2322                        let event_message = EntityEventMessage::new_enable_delegation(
2323                            &self.global_world_manager,
2324                            entity,
2325                        );
2326                        let mut converter = EntityConverterMut::new(
2327                            &self.global_world_manager,
2328                            &mut connection.base.local_world_manager,
2329                        );
2330                        let channel_kind = ChannelKind::of::<SystemChannel>();
2331                        let message =
2332                            MessageContainer::from_write(Box::new(event_message), &mut converter);
2333                        connection.base.message_manager.send_message(
2334                            &self.protocol.message_kinds,
2335                            &mut converter,
2336                            &channel_kind,
2337                            message,
2338                        );
2339                    } else if currently_in_scope {
2340                        // remove entity from the connections local scope
2341                        connection.base.host_world_manager.despawn_entity(entity);
2342                    }
2343                }
2344            }
2345        }
2346    }
2347}
2348
2349impl<E: Copy + Eq + Hash + Send + Sync> EntityAndGlobalEntityConverter<E> for Server<E> {
2350    fn global_entity_to_entity(
2351        &self,
2352        global_entity: &GlobalEntity,
2353    ) -> Result<E, EntityDoesNotExistError> {
2354        self.global_world_manager
2355            .global_entity_to_entity(global_entity)
2356    }
2357
2358    fn entity_to_global_entity(&self, entity: &E) -> Result<GlobalEntity, EntityDoesNotExistError> {
2359        self.global_world_manager.entity_to_global_entity(entity)
2360    }
2361}