1use std::{
2 any::Any,
3 collections::{hash_set::Iter, HashMap, HashSet},
4 hash::Hash,
5 net::SocketAddr,
6 panic,
7 time::Duration,
8};
9
10use log::{info, warn};
11
12use naia_shared::{
13 BigMap, BitReader, BitWriter, Channel, ChannelKind, ComponentKind,
14 EntityAndGlobalEntityConverter, EntityAndLocalEntityConverter, EntityAuthStatus,
15 EntityConverterMut, EntityDoesNotExistError, EntityEventMessage, EntityResponseEvent,
16 FakeEntityConverter, GlobalEntity, GlobalRequestId, GlobalResponseId, GlobalWorldManagerType,
17 Instant, Message, MessageContainer, PacketType, Protocol, RemoteEntity, Replicate,
18 ReplicatedComponent, Request, Response, ResponseReceiveKey, ResponseSendKey, Serde, SerdeErr,
19 SharedGlobalWorldManager, SocketConfig, StandardHeader, SystemChannel, Tick, Timer,
20 WorldMutType, WorldRefType,
21};
22
23use super::{
24 error::NaiaServerError,
25 events::Events,
26 room::{Room, RoomKey, RoomMut, RoomRef},
27 server_config::ServerConfig,
28 user::{User, UserKey, UserMut, UserRef},
29 user_scope::{UserScopeMut, UserScopeRef},
30};
31use crate::{
32 connection::{connection::Connection, io::Io, tick_buffer_messages::TickBufferMessages},
33 handshake::{HandshakeAction, HandshakeManager, Handshaker},
34 request::{GlobalRequestManager, GlobalResponseManager},
35 time_manager::TimeManager,
36 transport::{AuthReceiver, AuthSender, Socket},
37 world::{
38 entity_mut::EntityMut, entity_owner::EntityOwner, entity_ref::EntityRef,
39 entity_room_map::EntityRoomMap, entity_scope_map::EntityScopeMap,
40 global_world_manager::GlobalWorldManager, server_auth_handler::AuthOwner,
41 },
42 ReplicationConfig,
43};
44
45pub struct Server<E: Copy + Eq + Hash + Send + Sync> {
49 server_config: ServerConfig,
51 protocol: Protocol,
52 io: Io,
53 auth_io: Option<(Box<dyn AuthSender>, Box<dyn AuthReceiver>)>,
54 heartbeat_timer: Timer,
55 timeout_timer: Timer,
56 ping_timer: Timer,
57 handshake_manager: Box<dyn Handshaker>,
58 users: BigMap<UserKey, User>,
60 user_connections: HashMap<SocketAddr, Connection<E>>,
61 rooms: BigMap<RoomKey, Room<E>>,
63 entity_room_map: EntityRoomMap<E>,
65 entity_scope_map: EntityScopeMap<E>,
66 global_world_manager: GlobalWorldManager<E>,
67 incoming_events: Events<E>,
69 global_request_manager: GlobalRequestManager,
71 global_response_manager: GlobalResponseManager,
72 time_manager: TimeManager,
74}
75
76impl<E: Copy + Eq + Hash + Send + Sync> Server<E> {
77 pub fn new<P: Into<Protocol>>(server_config: ServerConfig, protocol: P) -> Self {
79 let mut protocol: Protocol = protocol.into();
80 protocol.lock();
81
82 let time_manager = TimeManager::new(protocol.tick_interval);
83
84 let io = Io::new(
85 &server_config.connection.bandwidth_measure_duration,
86 &protocol.compression,
87 );
88
89 Self {
90 server_config: server_config.clone(),
92 protocol,
93 io,
95 auth_io: None,
96 heartbeat_timer: Timer::new(server_config.connection.heartbeat_interval),
97 timeout_timer: Timer::new(server_config.connection.disconnection_timeout_duration),
98 ping_timer: Timer::new(server_config.ping.ping_interval),
99 handshake_manager: Box::new(HandshakeManager::new()),
100 users: BigMap::new(),
102 user_connections: HashMap::new(),
103 rooms: BigMap::new(),
105 entity_room_map: EntityRoomMap::new(),
107 entity_scope_map: EntityScopeMap::new(),
108 global_world_manager: GlobalWorldManager::new(),
109 incoming_events: Events::new(),
111 global_request_manager: GlobalRequestManager::new(),
113 global_response_manager: GlobalResponseManager::new(),
114 time_manager,
116 }
117 }
118
119 pub fn listen<S: Into<Box<dyn Socket>>>(&mut self, socket: S) {
121 let boxed_socket: Box<dyn Socket> = socket.into();
122 let (auth_sender, auth_receiver, packet_sender, packet_receiver) = boxed_socket.listen();
123
124 self.io.load(packet_sender, packet_receiver);
125
126 self.auth_io = Some((auth_sender, auth_receiver));
127 }
128
129 pub fn is_listening(&self) -> bool {
132 self.io.is_loaded()
133 }
134
135 pub fn socket_config(&self) -> &SocketConfig {
137 &self.protocol.socket
138 }
139
140 pub fn receive<W: WorldMutType<E>>(&mut self, world: W) -> Events<E> {
143 let now = Instant::now();
144
145 self.maintain_socket(world, &now);
148
149 if self.time_manager.recv_server_tick(&now) {
151 self.incoming_events
152 .push_tick(self.time_manager.current_tick());
153 }
154
155 std::mem::replace(&mut self.incoming_events, Events::<E>::new())
157 }
158
159 pub fn accept_connection(&mut self, user_key: &UserKey) {
164 let Some(user) = self.users.get_mut(user_key) else {
165 warn!("unknown user is finalizing connection...");
166 return;
167 };
168 let auth_addr = user.take_auth_address();
169
170 let identity_token = naia_shared::generate_identity_token();
172 self.handshake_manager
173 .authenticate_user(&identity_token, user_key);
174
175 let (auth_sender, _) = self
176 .auth_io
177 .as_mut()
178 .expect("Auth should be set up by this point");
179 if auth_sender.accept(&auth_addr, &identity_token).is_err() {
180 warn!(
181 "Server Error: Cannot send auth accept packet to {:?}",
182 &auth_addr
183 );
184 return;
186 }
187 }
188
189 pub fn reject_connection(&mut self, user_key: &UserKey) {
192 if let Some(user) = self.users.get_mut(user_key) {
193 let auth_addr = user.take_auth_address();
194
195 let (auth_sender, _) = self
197 .auth_io
198 .as_mut()
199 .expect("Auth should be set up by this point");
200 if auth_sender.reject(&auth_addr).is_err() {
201 warn!(
202 "Server Error: Cannot send auth reject message to {:?}",
203 &auth_addr
204 );
205 }
207
208 self.user_delete(user_key);
209 }
210 }
211
212 fn finalize_connection(&mut self, user_key: &UserKey, user_address: &SocketAddr) {
213 let Some(user) = self.users.get_mut(user_key) else {
214 warn!("unknown user is finalizing connection...");
215 return;
216 };
217 user.set_address(user_address);
218 let new_connection = Connection::new(
219 &self.server_config.connection,
220 &self.server_config.ping,
221 &user.address(),
222 user_key,
223 &self.protocol.channel_kinds,
224 &self.global_world_manager,
225 );
226
227 self.user_connections.insert(user.address(), new_connection);
228 if self.io.bandwidth_monitor_enabled() {
229 self.io.register_client(&user.address());
230 }
231 self.incoming_events.push_connection(user_key);
232 }
233
234 pub fn send_message<C: Channel, M: Message>(&mut self, user_key: &UserKey, message: &M) {
239 let cloned_message = M::clone_box(message);
240 self.send_message_inner(user_key, &ChannelKind::of::<C>(), cloned_message);
241 }
242
243 fn send_message_inner(
246 &mut self,
247 user_key: &UserKey,
248 channel_kind: &ChannelKind,
249 message_box: Box<dyn Message>,
250 ) {
251 let channel_settings = self.protocol.channel_kinds.channel(channel_kind);
252
253 if !channel_settings.can_send_to_client() {
254 panic!("Cannot send message to Client on this Channel");
255 }
256
257 if let Some(user) = self.users.get(user_key) {
258 if !user.has_address() {
259 return;
260 }
261 if let Some(connection) = self.user_connections.get_mut(&user.address()) {
262 let mut converter = EntityConverterMut::new(
263 &self.global_world_manager,
264 &mut connection.base.local_world_manager,
265 );
266 let message = MessageContainer::from_write(message_box, &mut converter);
267 connection.base.message_manager.send_message(
268 &self.protocol.message_kinds,
269 &mut converter,
270 channel_kind,
271 message,
272 );
273 }
274 }
275 }
276
277 pub fn broadcast_message<C: Channel, M: Message>(&mut self, message: &M) {
279 let cloned_message = M::clone_box(message);
280 self.broadcast_message_inner(&ChannelKind::of::<C>(), cloned_message);
281 }
282
283 fn broadcast_message_inner(
284 &mut self,
285 channel_kind: &ChannelKind,
286 message_box: Box<dyn Message>,
287 ) {
288 self.user_keys().iter().for_each(|user_key| {
289 self.send_message_inner(user_key, channel_kind, message_box.clone())
290 })
291 }
292
293 pub fn send_request<C: Channel, Q: Request>(
295 &mut self,
296 user_key: &UserKey,
297 request: &Q,
298 ) -> Result<ResponseReceiveKey<Q::Response>, NaiaServerError> {
299 let cloned_request = Q::clone_box(request);
300 let id = self.send_request_inner(user_key, &ChannelKind::of::<C>(), cloned_request)?;
301 Ok(ResponseReceiveKey::new(id))
302 }
303
304 fn send_request_inner(
305 &mut self,
306 user_key: &UserKey,
307 channel_kind: &ChannelKind,
308 request_box: Box<dyn Message>,
309 ) -> Result<GlobalRequestId, NaiaServerError> {
310 let channel_settings = self.protocol.channel_kinds.channel(&channel_kind);
311
312 if !channel_settings.can_request_and_respond() {
313 panic!("Requests can only be sent over Bidirectional, Reliable Channels");
314 }
315
316 let request_id = self.global_request_manager.create_request_id(user_key);
317
318 let Some(user) = self.users.get(user_key) else {
319 warn!("user does not exist");
320 return Err(NaiaServerError::Message("user does not exist".to_string()));
321 };
322 if !user.has_address() {
323 warn!("currently not connected to user");
324 return Err(NaiaServerError::Message(
325 "currently not connected to user".to_string(),
326 ));
327 }
328 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
329 warn!("currently not connected to user");
330 return Err(NaiaServerError::Message(
331 "currently not connected to user".to_string(),
332 ));
333 };
334 let mut converter = EntityConverterMut::new(
335 &self.global_world_manager,
336 &mut connection.base.local_world_manager,
337 );
338
339 let message = MessageContainer::from_write(request_box, &mut converter);
340 connection.base.message_manager.send_request(
341 &self.protocol.message_kinds,
342 &mut converter,
343 channel_kind,
344 request_id,
345 message,
346 );
347
348 return Ok(request_id);
349 }
350
351 pub fn send_response<S: Response>(
353 &mut self,
354 response_key: &ResponseSendKey<S>,
355 response: &S,
356 ) -> bool {
357 let response_id = response_key.response_id();
358
359 let cloned_response = S::clone_box(response);
360
361 self.send_response_inner(&response_id, cloned_response)
362 }
363
364 fn send_response_inner(
366 &mut self,
367 response_id: &GlobalResponseId,
368 response_box: Box<dyn Message>,
369 ) -> bool {
370 let Some((user_key, channel_kind, local_response_id)) = self
371 .global_response_manager
372 .destroy_response_id(&response_id)
373 else {
374 return false;
375 };
376 let Some(user) = self.users.get(&user_key) else {
377 return false;
378 };
379 if !user.has_address() {
380 warn!("currently not connected to user");
381 return false;
382 }
383 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
384 return false;
385 };
386 let mut converter = EntityConverterMut::new(
387 &self.global_world_manager,
388 &mut connection.base.local_world_manager,
389 );
390 let response = MessageContainer::from_write(response_box, &mut converter);
391 connection.base.message_manager.send_response(
392 &self.protocol.message_kinds,
393 &mut converter,
394 &channel_kind,
395 local_response_id,
396 response,
397 );
398 return true;
399 }
400
401 pub fn receive_response<S: Response>(
402 &mut self,
403 response_key: &ResponseReceiveKey<S>,
404 ) -> Option<(UserKey, S)> {
405 let request_id = response_key.request_id();
406 let Some((user_key, container)) =
407 self.global_request_manager.destroy_request_id(&request_id)
408 else {
409 return None;
410 };
411 let response: S = Box::<dyn Any + 'static>::downcast::<S>(container.to_boxed_any())
412 .ok()
413 .map(|boxed_s| *boxed_s)
414 .unwrap();
415 return Some((user_key, response));
416 }
417 pub fn receive_tick_buffer_messages(&mut self, tick: &Tick) -> TickBufferMessages {
420 let mut tick_buffer_messages = TickBufferMessages::new();
421 for (_user_address, connection) in self.user_connections.iter_mut() {
422 connection.tick_buffer_messages(tick, &mut tick_buffer_messages);
424 }
425 tick_buffer_messages
426 }
427
428 pub fn scope_checks(&self) -> Vec<(RoomKey, UserKey, E)> {
440 let mut list: Vec<(RoomKey, UserKey, E)> = Vec::new();
441
442 for (room_key, room) in self.rooms.iter() {
445 for user_key in room.user_keys() {
446 for entity in room.entities() {
447 list.push((room_key, *user_key, *entity));
448 }
449 }
450 }
451
452 list
453 }
454
455 pub fn send_all_updates<W: WorldRefType<E>>(&mut self, world: W) {
459 let now = Instant::now();
460
461 self.update_entity_scopes(&world);
463
464 let mut user_addresses: Vec<SocketAddr> = self.user_connections.keys().copied().collect();
466
467 fastrand::shuffle(&mut user_addresses);
469
470 for user_address in user_addresses {
471 let connection = self.user_connections.get_mut(&user_address).unwrap();
472
473 connection.send_packets(
474 &self.protocol,
475 &now,
476 &mut self.io,
477 &world,
478 &self.global_world_manager,
479 &self.time_manager,
480 );
481 }
482 }
483
484 pub fn spawn_entity<W: WorldMutType<E>>(&mut self, mut world: W) -> EntityMut<E, W> {
489 let entity = world.spawn_entity();
490 self.spawn_entity_inner(&entity);
491
492 EntityMut::new(self, world, &entity)
493 }
494
495 fn spawn_entity_inner(&mut self, entity: &E) {
497 self.global_world_manager
498 .spawn_entity_record(entity, EntityOwner::Server);
499 }
500
501 pub fn enable_entity_replication(&mut self, entity: &E) {
503 self.spawn_entity_inner(&entity);
504 }
505
506 pub fn disable_entity_replication(&mut self, entity: &E) {
508 self.despawn_entity_worldless(entity);
510 }
511
512 pub fn pause_entity_replication(&mut self, entity: &E) {
513 self.global_world_manager.pause_entity_replication(entity);
514 }
515
516 pub fn resume_entity_replication(&mut self, entity: &E) {
517 self.global_world_manager.resume_entity_replication(entity);
518 }
519
520 pub fn entity_replication_config(&self, entity: &E) -> Option<ReplicationConfig> {
522 self.global_world_manager.entity_replication_config(entity)
523 }
524
525 pub fn entity_take_authority(&mut self, entity: &E) {
527 let did_change = self.global_world_manager.server_take_authority(entity);
528
529 if did_change {
530 self.send_reset_authority_messages(entity);
531 self.incoming_events.push_auth_reset(entity);
532 }
533 }
534
535 fn send_reset_authority_messages(&mut self, entity: &E) {
536 let mut messages_to_send = Vec::new();
542 for (user_key, user) in self.users.iter() {
543 if !user.has_address() {
544 continue;
545 }
546 if let Some(connection) = self.user_connections.get_mut(&user.address()) {
547 if connection.base.host_world_manager.host_has_entity(entity) {
548 let message = EntityEventMessage::new_update_auth_status(
549 &self.global_world_manager,
550 entity,
551 EntityAuthStatus::Available,
552 );
553 messages_to_send.push((user_key, message));
554 }
555
556 if connection
558 .base
559 .local_world_manager
560 .has_both_host_and_remote_entity(entity)
561 {
562 Self::remove_redundant_remote_entity_from_host(connection, entity);
563 }
564 }
565 }
566 for (user_key, message) in messages_to_send {
567 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
568 }
569 }
570
571 pub fn configure_entity_replication<W: WorldMutType<E>>(
572 &mut self,
573 world: &mut W,
574 entity: &E,
575 config: ReplicationConfig,
576 ) {
577 if !self.global_world_manager.has_entity(entity) {
578 panic!("Entity is not yet replicating. Be sure to call `enable_replication` or `spawn_entity` on the Server, before configuring replication.");
579 }
580 let entity_owner = self.global_world_manager.entity_owner(entity).unwrap();
581 let server_owned: bool = entity_owner.is_server();
582 let client_owned: bool = entity_owner.is_client();
583 let next_config = config;
584 let prev_config = self
585 .global_world_manager
586 .entity_replication_config(entity)
587 .unwrap();
588 if prev_config == config {
589 panic!(
590 "Entity replication config is already set to {:?}. Should not set twice.",
591 config
592 );
593 }
594
595 match prev_config {
596 ReplicationConfig::Private => {
597 if server_owned {
598 panic!("Server-owned entity should never be private");
599 }
600 match next_config {
601 ReplicationConfig::Private => {
602 panic!("Should not be able to happen");
603 }
604 ReplicationConfig::Public => {
605 self.publish_entity(world, entity, true);
607 }
608 ReplicationConfig::Delegated => {
609 if client_owned {
611 panic!("Cannot downgrade Client's ownership of Entity to Delegated. Do this Client-side if needed.");
612 }
615 self.publish_entity(world, entity, true);
616 self.entity_enable_delegation(world, entity, None);
617 }
618 }
619 }
620 ReplicationConfig::Public => {
621 match next_config {
622 ReplicationConfig::Private => {
623 if server_owned {
625 panic!("Cannot unpublish a Server-owned Entity (doing so would disable replication entirely, just use a local entity instead)");
626 }
627 self.unpublish_entity(world, entity, true);
628 }
629 ReplicationConfig::Public => {
630 panic!("Should not be able to happen");
631 }
632 ReplicationConfig::Delegated => {
633 if client_owned {
635 panic!("Cannot downgrade Client's ownership of Entity to Delegated. Do this Client-side if needed.");
636 }
639 self.entity_enable_delegation(world, entity, None);
640 }
641 }
642 }
643 ReplicationConfig::Delegated => {
644 if client_owned {
645 panic!("Client-owned entity should never be delegated");
646 }
647 match next_config {
648 ReplicationConfig::Private => {
649 if server_owned {
651 panic!("Cannot unpublish a Server-owned Entity (doing so would disable replication entirely, just use a local entity instead)");
652 }
653 self.entity_disable_delegation(world, entity);
654 self.unpublish_entity(world, entity, true);
655 }
656 ReplicationConfig::Public => {
657 self.entity_disable_delegation(world, entity);
659 }
660 ReplicationConfig::Delegated => {
661 panic!("Should not be able to happen");
662 }
663 }
664 }
665 }
666 }
667
668 pub fn client_request_authority(
670 &mut self,
671 origin_user: &UserKey,
672 world_entity: &E,
673 remote_entity: &RemoteEntity,
674 ) {
675 let requester = AuthOwner::Client(*origin_user);
676 let success = self
677 .global_world_manager
678 .client_request_authority(&world_entity, &requester);
679 if success {
680 self.add_redundant_remote_entity_to_host(origin_user, world_entity, remote_entity);
683
684 let mut messages_to_send = Vec::new();
689 for (user_key, user) in self.users.iter() {
690 if !user.has_address() {
691 continue;
692 }
693 if let Some(connection) = self.user_connections.get(&user.address()) {
694 if connection
695 .base
696 .host_world_manager
697 .host_has_entity(world_entity)
698 {
699 let mut new_status: EntityAuthStatus = EntityAuthStatus::Denied;
700 if *origin_user == user_key {
701 new_status = EntityAuthStatus::Granted;
702 }
703
704 let message = EntityEventMessage::new_update_auth_status(
711 &self.global_world_manager,
712 world_entity,
713 new_status,
714 );
715
716 messages_to_send.push((user_key, message));
717 }
718 }
719 }
720 for (user_key, message) in messages_to_send {
721 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
722 }
723
724 self.incoming_events
725 .push_auth_grant(origin_user, &world_entity);
726 } else {
727 panic!("Failed to request authority for entity");
728 }
729 }
730
731 fn entity_enable_delegation_response(&mut self, user_key: &UserKey, entity: &E) {
732 if self.global_world_manager.entity_is_delegated(entity) {
733 let Some(auth_status) = self.global_world_manager.entity_authority_status(entity)
734 else {
735 panic!("Entity should have an Auth status if it is delegated..")
736 };
737 if auth_status != EntityAuthStatus::Available {
738 let message = EntityEventMessage::new_update_auth_status(
739 &self.global_world_manager,
740 entity,
741 auth_status,
742 );
743 self.send_message::<SystemChannel, EntityEventMessage>(user_key, &message);
744 }
745 }
746 }
747
748 pub fn entity_authority_status(&self, entity: &E) -> Option<EntityAuthStatus> {
750 self.global_world_manager.entity_authority_status(entity)
751 }
752
753 fn add_redundant_remote_entity_to_host(
754 &mut self,
755 user_key: &UserKey,
756 world_entity: &E,
757 remote_entity: &RemoteEntity,
758 ) {
759 if let Some(user) = self.users.get(user_key) {
760 if !user.has_address() {
761 return;
762 }
763 let connection = self.user_connections.get_mut(&user.address()).unwrap();
764
765 connection
767 .base
768 .local_world_manager
769 .insert_remote_entity(world_entity, *remote_entity);
770
771 let component_kinds = self
773 .global_world_manager
774 .component_kinds(world_entity)
775 .unwrap();
776 connection
777 .base
778 .remote_world_reader
779 .track_hosts_redundant_remote_entity(remote_entity, &component_kinds);
780 }
781 }
782
783 fn remove_redundant_remote_entity_from_host(connection: &mut Connection<E>, world_entity: &E) {
784 let remote_entity = connection
785 .base
786 .local_world_manager
787 .remove_redundant_remote_entity(world_entity);
788 connection
789 .base
790 .remote_world_reader
791 .untrack_hosts_redundant_remote_entity(&remote_entity);
792 connection
793 .base
794 .remote_world_manager
795 .on_entity_channel_closing(&remote_entity);
796 }
797
798 pub fn entity_release_authority(&mut self, origin_user: Option<&UserKey>, entity: &E) {
800 let releaser = AuthOwner::from_user_key(origin_user);
801 let success = self
802 .global_world_manager
803 .client_release_authority(&entity, &releaser);
804 if success {
805 self.send_reset_authority_messages(entity);
806 }
807 }
808
809 pub fn entity<W: WorldRefType<E>>(&self, world: W, entity: &E) -> EntityRef<E, W> {
813 if world.has_entity(entity) {
814 return EntityRef::new(self, world, entity);
815 }
816 panic!("No Entity exists for given Key!");
817 }
818
819 pub fn entity_mut<W: WorldMutType<E>>(&mut self, world: W, entity: &E) -> EntityMut<E, W> {
823 if world.has_entity(entity) {
824 return EntityMut::new(self, world, entity);
825 }
826 panic!("No Entity exists for given Key!");
827 }
828
829 pub fn entities<W: WorldRefType<E>>(&self, world: W) -> Vec<E> {
831 world.entities()
832 }
833
834 pub fn entity_owner(&self, entity: &E) -> EntityOwner {
835 if let Some(owner) = self.global_world_manager.entity_owner(entity) {
836 return owner;
837 }
838 return EntityOwner::Local;
839 }
840
841 pub fn user_exists(&self, user_key: &UserKey) -> bool {
845 self.users.contains_key(user_key)
846 }
847
848 pub fn user(&self, user_key: &UserKey) -> UserRef<E> {
852 if self.users.contains_key(user_key) {
853 return UserRef::new(self, user_key);
854 }
855 panic!("No User exists for given Key!");
856 }
857
858 pub fn user_mut(&mut self, user_key: &UserKey) -> UserMut<E> {
862 if self.users.contains_key(user_key) {
863 return UserMut::new(self, user_key);
864 }
865 panic!("No User exists for given Key!");
866 }
867
868 pub fn user_keys(&self) -> Vec<UserKey> {
870 let mut output = Vec::new();
871
872 for (user_key, user) in self.users.iter() {
873 if !user.has_address() {
874 continue;
875 }
876 if self.user_connections.contains_key(&user.address()) {
877 output.push(user_key);
878 }
879 }
880
881 output
882 }
883
884 pub fn users_count(&self) -> usize {
886 self.users.len()
887 }
888
889 pub fn user_scope(&self, user_key: &UserKey) -> UserScopeRef<E> {
891 if self.users.contains_key(user_key) {
892 return UserScopeRef::new(self, user_key);
893 }
894 panic!("No User exists for given Key!");
895 }
896
897 pub fn user_scope_mut(&mut self, user_key: &UserKey) -> UserScopeMut<E> {
900 if self.users.contains_key(user_key) {
901 return UserScopeMut::new(self, user_key);
902 }
903 panic!("No User exists for given Key!");
904 }
905
906 pub fn make_room(&mut self) -> RoomMut<E> {
912 let new_room = Room::new();
913 let room_key = self.rooms.insert(new_room);
914 RoomMut::new(self, &room_key)
915 }
916
917 pub fn room_exists(&self, room_key: &RoomKey) -> bool {
919 self.rooms.contains_key(room_key)
920 }
921
922 pub fn room(&self, room_key: &RoomKey) -> RoomRef<E> {
926 if self.rooms.contains_key(room_key) {
927 return RoomRef::new(self, room_key);
928 }
929 panic!("No Room exists for given Key!");
930 }
931
932 pub fn room_mut(&mut self, room_key: &RoomKey) -> RoomMut<E> {
936 if self.rooms.contains_key(room_key) {
937 return RoomMut::new(self, room_key);
938 }
939 panic!("No Room exists for given Key!");
940 }
941
942 pub fn room_keys(&self) -> Vec<RoomKey> {
944 let mut output = Vec::new();
945
946 for (key, _) in self.rooms.iter() {
947 output.push(key);
948 }
949
950 output
951 }
952
953 pub fn rooms_count(&self) -> usize {
955 self.rooms.len()
956 }
957
958 pub fn current_tick(&self) -> Tick {
962 return self.time_manager.current_tick();
963 }
964
965 pub fn average_tick_duration(&self) -> Duration {
967 self.time_manager.average_tick_duration()
968 }
969
970 pub fn outgoing_bandwidth_total(&mut self) -> f32 {
972 self.io.outgoing_bandwidth_total()
973 }
974
975 pub fn incoming_bandwidth_total(&mut self) -> f32 {
976 self.io.incoming_bandwidth_total()
977 }
978
979 pub fn outgoing_bandwidth_to_client(&mut self, address: &SocketAddr) -> f32 {
980 self.io.outgoing_bandwidth_to_client(address)
981 }
982
983 pub fn incoming_bandwidth_from_client(&mut self, address: &SocketAddr) -> f32 {
984 self.io.incoming_bandwidth_from_client(address)
985 }
986
987 pub fn rtt(&self, user_key: &UserKey) -> Option<f32> {
990 if let Some(user) = self.users.get(user_key) {
991 if !user.has_address() {
992 return None;
993 }
994 if let Some(connection) = self.user_connections.get(&user.address()) {
995 return Some(connection.ping_manager.rtt_average);
996 }
997 }
998 None
999 }
1000
1001 pub fn jitter(&self, user_key: &UserKey) -> Option<f32> {
1004 if let Some(user) = self.users.get(user_key) {
1005 if !user.has_address() {
1006 return None;
1007 }
1008 if let Some(connection) = self.user_connections.get(&user.address()) {
1009 return Some(connection.ping_manager.jitter_average);
1010 }
1011 }
1012 None
1013 }
1014
1015 pub(crate) fn despawn_entity<W: WorldMutType<E>>(&mut self, world: &mut W, entity: &E) {
1023 if !world.has_entity(entity) {
1024 panic!("attempted to de-spawn nonexistent entity");
1025 }
1026
1027 world.despawn_entity(entity);
1029
1030 self.despawn_entity_worldless(entity);
1031 }
1032
1033 pub fn despawn_entity_worldless(&mut self, entity: &E) {
1034 if !self.global_world_manager.has_entity(entity) {
1035 info!("attempting to despawn entity that does not exist, this can happen if a delegated entity is being despawned");
1036 return;
1037 }
1038 self.cleanup_entity_replication(entity);
1039 self.global_world_manager.remove_entity_record(entity);
1040 }
1041
1042 fn cleanup_entity_replication(&mut self, entity: &E) {
1043 self.despawn_entity_from_all_connections(entity);
1044
1045 self.entity_scope_map.remove_entity(entity);
1047
1048 if let Some(room_keys) = self.entity_room_map.remove_from_all_rooms(entity) {
1050 for room_key in room_keys {
1051 if let Some(room) = self.rooms.get_mut(&room_key) {
1052 room.remove_entity(entity, true);
1053 }
1054 }
1055 }
1056
1057 self.global_world_manager
1059 .remove_entity_diff_handlers(entity);
1060 }
1061
1062 fn despawn_entity_from_all_connections(&mut self, entity: &E) {
1063 for (_, connection) in self.user_connections.iter_mut() {
1066 if connection.base.host_world_manager.host_has_entity(entity) {
1067 connection.base.host_world_manager.despawn_entity(entity);
1069 }
1070 }
1071 }
1072
1073 pub(crate) fn user_scope_remove_user(&mut self, user_key: &UserKey) {
1077 self.entity_scope_map.remove_user(user_key);
1078 }
1079
1080 pub(crate) fn user_scope_set_entity(
1081 &mut self,
1082 user_key: &UserKey,
1083 entity: &E,
1084 is_contained: bool,
1085 ) {
1086 self.entity_scope_map
1087 .insert(*user_key, *entity, is_contained);
1088 }
1089
1090 pub(crate) fn user_scope_has_entity(&self, user_key: &UserKey, entity: &E) -> bool {
1091 if let Some(in_scope) = self.entity_scope_map.get(user_key, entity) {
1092 *in_scope
1093 } else {
1094 false
1095 }
1096 }
1097
1098 pub(crate) fn insert_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1102 &mut self,
1103 world: &mut W,
1104 entity: &E,
1105 mut component: R,
1106 ) {
1107 if !world.has_entity(entity) {
1108 panic!("attempted to add component to non-existent entity");
1109 }
1110
1111 let component_kind = component.kind();
1112
1113 if world.has_component_of_kind(entity, &component_kind) {
1114 let Some(mut component_mut) = world.component_mut::<R>(entity) else {
1117 panic!("Should never happen because we checked for this above");
1118 };
1119 component_mut.mirror(&component);
1120 } else {
1121 self.insert_component_worldless(entity, &mut component);
1123
1124 world.insert_component(entity, component);
1126 }
1127 }
1128
1129 pub fn insert_component_worldless(&mut self, entity: &E, component: &mut dyn Replicate) {
1131 let component_kind = component.kind();
1132
1133 if self
1134 .global_world_manager
1135 .has_component_record(entity, &component_kind)
1136 {
1137 warn!(
1138 "Attempted to add component `{:?}` to entity that already has it, this can happen if a delegated entity's auth is transferred to the Server before the Server Adapter has been able to process the newly inserted Component. Skipping this action.",
1139 component.name());
1140 return;
1141 }
1142
1143 self.insert_new_component_into_entity_scopes(entity, &component_kind, None);
1144
1145 self.global_world_manager
1147 .insert_component_record(entity, &component_kind);
1148 self.global_world_manager
1149 .insert_component_diff_handler(entity, component);
1150
1151 if self.global_world_manager.entity_is_delegated(entity) {
1153 let accessor = self.global_world_manager.get_entity_auth_accessor(entity);
1154 component.enable_delegation(&accessor, None)
1155 }
1156 }
1157
1158 fn insert_new_component_into_entity_scopes(
1159 &mut self,
1160 entity: &E,
1161 component_kind: &ComponentKind,
1162 excluding_user_opt: Option<&UserKey>,
1163 ) {
1164 let excluding_addr_opt: Option<SocketAddr> = {
1165 if let Some(user_key) = excluding_user_opt {
1166 if let Some(user) = self.users.get(user_key) {
1167 if !user.has_address() {
1168 None
1169 } else {
1170 Some(user.address())
1171 }
1172 } else {
1173 None
1174 }
1175 } else {
1176 None
1177 }
1178 };
1179 for (addr, connection) in self.user_connections.iter_mut() {
1181 if let Some(exclude_addr) = excluding_addr_opt {
1182 if addr == &exclude_addr {
1183 continue;
1184 }
1185 }
1186
1187 if connection.base.host_world_manager.host_has_entity(entity) {
1189 connection
1190 .base
1191 .host_world_manager
1192 .insert_component(entity, &component_kind);
1193 }
1194 }
1195 }
1196
1197 pub(crate) fn remove_component<R: ReplicatedComponent, W: WorldMutType<E>>(
1199 &mut self,
1200 world: &mut W,
1201 entity: &E,
1202 ) -> Option<R> {
1203 self.remove_component_worldless(entity, &ComponentKind::of::<R>());
1204
1205 world.remove_component::<R>(entity)
1207 }
1208
1209 pub fn remove_component_worldless(&mut self, entity: &E, component_kind: &ComponentKind) {
1211 self.remove_component_from_all_connections(entity, component_kind);
1212
1213 self.global_world_manager
1215 .remove_component_record(entity, &component_kind);
1216 self.global_world_manager
1217 .remove_component_diff_handler(entity, &component_kind);
1218 }
1219
1220 fn remove_component_from_all_connections(
1221 &mut self,
1222 entity: &E,
1223 component_kind: &ComponentKind,
1224 ) {
1225 for (_, connection) in self.user_connections.iter_mut() {
1228 if connection.base.host_world_manager.host_has_entity(entity) {
1229 connection
1231 .base
1232 .host_world_manager
1233 .remove_component(entity, &component_kind);
1234 }
1235 }
1236 }
1237
1238 pub(crate) fn publish_entity<W: WorldMutType<E>>(
1241 &mut self,
1242 world: &mut W,
1243 entity: &E,
1244 server_origin: bool,
1245 ) -> bool {
1246 if server_origin {
1247 let entity_owner = self.global_world_manager.entity_owner(&entity);
1249 let Some(EntityOwner::Client(user_key)) = entity_owner else {
1250 panic!(
1251 "Entity is not owned by a Client. Cannot publish entity. Owner is: {:?}",
1252 entity_owner
1253 );
1254 };
1255 let message = EntityEventMessage::new_publish(&self.global_world_manager, entity);
1256 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1257 }
1258
1259 let result = self.global_world_manager.entity_publish(&entity);
1260 if result {
1261 world.entity_publish(&self.global_world_manager, &entity);
1262 }
1263 return result;
1264 }
1265
1266 pub(crate) fn unpublish_entity<W: WorldMutType<E>>(
1267 &mut self,
1268 world: &mut W,
1269 entity: &E,
1270 server_origin: bool,
1271 ) {
1272 if server_origin {
1273 let entity_owner = self.global_world_manager.entity_owner(&entity);
1275 let Some(EntityOwner::ClientPublic(user_key)) = entity_owner else {
1276 panic!("Entity is not owned by a Client or is Private. Cannot publish entity. Owner is: {:?}", entity_owner);
1277 };
1278 let message = EntityEventMessage::new_unpublish(&self.global_world_manager, entity);
1279 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1280 }
1281
1282 world.entity_unpublish(&entity);
1283 self.global_world_manager.entity_unpublish(&entity);
1284 self.cleanup_entity_replication(&entity);
1285 }
1286
1287 pub(crate) fn entity_enable_delegation<W: WorldMutType<E>>(
1288 &mut self,
1289 world: &mut W,
1290 entity: &E,
1291 client_origin: Option<UserKey>,
1292 ) {
1293 {
1296 let mut messages_to_send = Vec::new();
1302 for (user_key, user) in self.users.iter() {
1303 if !user.has_address() {
1304 continue;
1305 }
1306 if let Some(connection) = self.user_connections.get(&user.address()) {
1307 if connection.base.host_world_manager.host_has_entity(entity) {
1308 let message = EntityEventMessage::new_enable_delegation(
1309 &self.global_world_manager,
1310 entity,
1311 );
1312 messages_to_send.push((user_key, message));
1313 }
1314 }
1315 }
1316 for (user_key, message) in messages_to_send {
1317 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1318 }
1319 }
1320
1321 if let Some(client_key) = client_origin {
1322 self.enable_delegation_client_owned_entity(world, entity, &client_key);
1323 } else {
1324 self.global_world_manager.entity_enable_delegation(&entity);
1325 world.entity_enable_delegation(&self.global_world_manager, &entity);
1326 }
1327 }
1328
1329 pub(crate) fn enable_delegation_client_owned_entity<W: WorldMutType<E>>(
1330 &mut self,
1331 world: &mut W,
1332 entity: &E,
1333 client_key: &UserKey,
1334 ) {
1335 let Some(entity_owner) = self.global_world_manager.entity_owner(entity) else {
1336 panic!("entity should have an owner at this point");
1337 };
1338 let owner_user_key;
1339 match entity_owner {
1340 EntityOwner::Client(user_key) => {
1341 owner_user_key = user_key;
1342 warn!(
1343 "entity should be owned by a public client at this point. Owner is: {:?}",
1344 entity_owner
1345 );
1346
1347 let result = self.global_world_manager.entity_publish(&entity);
1351 if result {
1352 world.entity_publish(&self.global_world_manager, &entity);
1353 } else {
1354 warn!("failed to publish entity before enabling delegation");
1355 return;
1356 }
1357 }
1358 EntityOwner::ClientPublic(user_key) => {
1359 owner_user_key = user_key;
1360 }
1361 _owner => {
1362 panic!(
1363 "entity should be owned by a public client at this point. Owner is: {:?}",
1364 entity_owner
1365 );
1366 }
1367 }
1368 let user_key = owner_user_key;
1369 self.global_world_manager.migrate_entity_to_server(&entity);
1370
1371 self.entity_scope_map.insert(user_key, *entity, true);
1373
1374 let Some(user) = self.users.get(&user_key) else {
1376 panic!("user should exist");
1377 };
1378 if !user.has_address() {
1379 panic!("user should have an address");
1380 }
1381 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
1382 panic!("connection does not exist")
1383 };
1384 let component_kinds = self.global_world_manager.component_kinds(entity).unwrap();
1385
1386 let new_host_entity = connection.base.host_world_manager.track_remote_entity(
1388 &mut connection.base.local_world_manager,
1389 entity,
1390 component_kinds,
1391 );
1392
1393 let message = EntityEventMessage::new_entity_migrate_response(
1395 &self.global_world_manager,
1396 &entity,
1397 new_host_entity,
1398 );
1399 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1400
1401 self.global_world_manager.entity_enable_delegation(&entity);
1402 world.entity_enable_delegation(&self.global_world_manager, &entity);
1403
1404 let requester = AuthOwner::from_user_key(Some(client_key));
1406 let success = self
1407 .global_world_manager
1408 .client_request_authority(&entity, &requester);
1409 if !success {
1410 panic!("failed to grant authority of client-owned delegated entity to creating user");
1411 }
1412 }
1413
1414 pub(crate) fn entity_disable_delegation<W: WorldMutType<E>>(
1415 &mut self,
1416 world: &mut W,
1417 entity: &E,
1418 ) {
1419 info!("server.entity_disable_delegation");
1421 {
1423 let mut messages_to_send = Vec::new();
1426 for (user_key, user) in self.users.iter() {
1427 if !user.has_address() {
1428 continue;
1429 }
1430 let connection = self.user_connections.get(&user.address()).unwrap();
1431 if connection.base.host_world_manager.host_has_entity(entity) {
1432 let message = EntityEventMessage::new_disable_delegation(
1433 &self.global_world_manager,
1434 entity,
1435 );
1436 messages_to_send.push((user_key, message));
1437 }
1438 }
1439 for (user_key, message) in messages_to_send {
1440 self.send_message::<SystemChannel, EntityEventMessage>(&user_key, &message);
1441 }
1442 }
1443
1444 self.global_world_manager.entity_disable_delegation(&entity);
1445 world.entity_disable_delegation(&entity);
1446 }
1447
1448 pub(crate) fn user_address(&self, user_key: &UserKey) -> Option<SocketAddr> {
1452 if let Some(user) = self.users.get(user_key) {
1453 if user.has_address() {
1454 return Some(user.address());
1455 }
1456 }
1457 None
1458 }
1459
1460 pub(crate) fn user_room_keys(&self, user_key: &UserKey) -> Option<Iter<RoomKey>> {
1462 if let Some(user) = self.users.get(user_key) {
1463 return Some(user.room_keys().iter());
1464 }
1465 return None;
1466 }
1467
1468 pub(crate) fn user_rooms_count(&self, user_key: &UserKey) -> Option<usize> {
1470 if let Some(user) = self.users.get(user_key) {
1471 return Some(user.room_count());
1472 }
1473 return None;
1474 }
1475
1476 pub(crate) fn user_disconnect<W: WorldMutType<E>>(
1477 &mut self,
1478 user_key: &UserKey,
1479 world: &mut W,
1480 ) {
1481 if self.protocol.client_authoritative_entities {
1482 self.despawn_all_remote_entities(user_key, world);
1483 if let Some(all_owned_entities) =
1484 self.global_world_manager.user_all_owned_entities(user_key)
1485 {
1486 let copied_entities = all_owned_entities.clone();
1487 for entity in copied_entities {
1488 self.entity_release_authority(Some(user_key), &entity);
1489 }
1490 }
1491 }
1492 let user = self.user_delete(user_key);
1493 self.incoming_events.push_disconnection(user_key, user);
1494 }
1495
1496 pub(crate) fn user_queue_disconnect(&mut self, user_key: &UserKey) {
1497 let Some(user) = self.users.get(user_key) else {
1498 panic!("Attempting to disconnect a nonexistent user");
1499 };
1500 if !user.has_address() {
1501 panic!("Attempting to disconnect a nonexistent connection");
1502 }
1503 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
1504 panic!("Attempting to disconnect a nonexistent connection");
1505 };
1506 connection.manual_disconnect = true;
1507 }
1508
1509 pub(crate) fn despawn_all_remote_entities<W: WorldMutType<E>>(
1511 &mut self,
1512 user_key: &UserKey,
1513 world: &mut W,
1514 ) {
1515 let Some(user) = self.users.get(user_key) else {
1516 panic!("Attempting to despawn entities for a nonexistent user");
1517 };
1518 if !user.has_address() {
1519 return;
1520 }
1521 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
1522 panic!("Attempting to despawn entities on a nonexistent connection");
1523 };
1524
1525 let remote_entities = connection.base.remote_entities();
1526 let entity_events = SharedGlobalWorldManager::<E>::despawn_all_entities(
1527 world,
1528 &self.global_world_manager,
1529 remote_entities,
1530 );
1531 let response_events = self
1532 .incoming_events
1533 .receive_entity_events(user_key, entity_events);
1534 self.process_response_events(world, user_key, response_events);
1535 }
1536
1537 pub(crate) fn user_delete(&mut self, user_key: &UserKey) -> User {
1538 let Some(user) = self.users.remove(user_key) else {
1539 panic!("Attempting to delete non-existant user!");
1540 };
1541
1542 if let Some(user_addr) = user.address_opt() {
1543 info!("deleting authenticated user for {}", user.address());
1544 self.user_connections.remove(&user_addr);
1545 }
1546
1547 self.entity_scope_map.remove_user(user_key);
1548
1549 self.handshake_manager
1550 .delete_user(user_key, user.address_opt());
1551
1552 for room_key in user.room_keys() {
1554 self.rooms
1555 .get_mut(room_key)
1556 .unwrap()
1557 .unsubscribe_user(user_key);
1558 }
1559
1560 if self.io.bandwidth_monitor_enabled() {
1562 self.io.deregister_client(&user.address());
1563 }
1564
1565 return user;
1566 }
1567
1568 pub(crate) fn room_destroy(&mut self, room_key: &RoomKey) -> bool {
1573 self.room_remove_all_entities(room_key);
1574
1575 if self.rooms.contains_key(room_key) {
1576 let room = self.rooms.remove(room_key).unwrap();
1580 for user_key in room.user_keys() {
1581 self.users.get_mut(user_key).unwrap().uncache_room(room_key);
1582 }
1583
1584 true
1585 } else {
1586 false
1587 }
1588 }
1589
1590 pub(crate) fn room_has_user(&self, room_key: &RoomKey, user_key: &UserKey) -> bool {
1595 if let Some(room) = self.rooms.get(room_key) {
1596 return room.has_user(user_key);
1597 }
1598 false
1599 }
1600
1601 pub(crate) fn room_add_user(&mut self, room_key: &RoomKey, user_key: &UserKey) {
1605 if let Some(user) = self.users.get_mut(user_key) {
1606 if let Some(room) = self.rooms.get_mut(room_key) {
1607 room.subscribe_user(user_key);
1608 user.cache_room(room_key);
1609 }
1610 }
1611 }
1612
1613 pub(crate) fn room_remove_user(&mut self, room_key: &RoomKey, user_key: &UserKey) {
1615 if let Some(user) = self.users.get_mut(user_key) {
1616 if let Some(room) = self.rooms.get_mut(room_key) {
1617 room.unsubscribe_user(user_key);
1618 user.uncache_room(room_key);
1619 }
1620 }
1621 }
1622
1623 pub(crate) fn room_users_count(&self, room_key: &RoomKey) -> usize {
1625 if let Some(room) = self.rooms.get(room_key) {
1626 return room.users_count();
1627 }
1628 0
1629 }
1630
1631 pub(crate) fn room_user_keys(&self, room_key: &RoomKey) -> impl Iterator<Item = &UserKey> {
1633 let iter = if let Some(room) = self.rooms.get(room_key) {
1634 Some(room.user_keys())
1635 } else {
1636 None
1637 };
1638 iter.into_iter().flatten()
1639 }
1640
1641 pub(crate) fn room_entities(&self, room_key: &RoomKey) -> impl Iterator<Item = &E> {
1642 let iter = if let Some(room) = self.rooms.get(room_key) {
1643 Some(room.entities())
1644 } else {
1645 None
1646 };
1647 iter.into_iter().flatten()
1648 }
1649
1650 pub(crate) fn room_broadcast_message(
1652 &mut self,
1653 channel_kind: &ChannelKind,
1654 room_key: &RoomKey,
1655 message_box: Box<dyn Message>,
1656 ) {
1657 if let Some(room) = self.rooms.get(room_key) {
1658 let user_keys: Vec<UserKey> = room.user_keys().cloned().collect();
1659 for user_key in &user_keys {
1660 self.send_message_inner(user_key, channel_kind, message_box.clone())
1661 }
1662 }
1663 }
1664
1665 pub(crate) fn room_has_entity(&self, room_key: &RoomKey, entity: &E) -> bool {
1670 let Some(room) = self.rooms.get(room_key) else {
1671 return false;
1672 };
1673 return room.has_entity(entity);
1674 }
1675
1676 pub(crate) fn room_add_entity(&mut self, room_key: &RoomKey, entity: &E) {
1680 let mut is_some = false;
1681 if let Some(room) = self.rooms.get_mut(room_key) {
1682 room.add_entity(entity);
1683 is_some = true;
1684 }
1685 if !is_some {
1686 return;
1687 }
1688 self.entity_room_map.entity_add_room(entity, room_key);
1689 }
1690
1691 pub(crate) fn room_remove_entity(&mut self, room_key: &RoomKey, entity: &E) {
1693 if let Some(room) = self.rooms.get_mut(room_key) {
1694 room.remove_entity(entity, false);
1695 self.entity_room_map.remove_from_room(entity, room_key);
1696 }
1697 }
1698
1699 fn room_remove_all_entities(&mut self, room_key: &RoomKey) {
1701 if let Some(room) = self.rooms.get_mut(room_key) {
1702 let entities: Vec<E> = room.entities().copied().collect();
1703 for entity in entities {
1704 room.remove_entity(&entity, false);
1705 self.entity_room_map.remove_from_room(&entity, room_key);
1706 }
1707 }
1708 }
1709
1710 pub(crate) fn room_entities_count(&self, room_key: &RoomKey) -> usize {
1712 if let Some(room) = self.rooms.get(room_key) {
1713 return room.entities_count();
1714 }
1715 0
1716 }
1717
1718 fn maintain_socket<W: WorldMutType<E>>(&mut self, mut world: W, now: &Instant) {
1722 self.handle_disconnects(&mut world);
1723 self.handle_heartbeats();
1724 self.handle_pings();
1725 self.handle_empty_acks();
1726
1727 let mut addresses: HashSet<SocketAddr> = HashSet::new();
1728
1729 if let Some((_, auth_receiver)) = self.auth_io.as_mut() {
1731 loop {
1732 match auth_receiver.receive() {
1733 Ok(Some((auth_addr, auth_bytes))) => {
1734 let user_key = self.users.insert(User::new(auth_addr));
1736
1737 let mut reader = BitReader::new(auth_bytes);
1739 let Ok(auth_message) = self
1740 .protocol
1741 .message_kinds
1742 .read(&mut reader, &FakeEntityConverter)
1743 else {
1744 warn!("Server Error: cannot read auth message");
1745 continue;
1746 };
1747
1748 self.incoming_events.push_auth(&user_key, auth_message);
1750 }
1751 Ok(None) => {
1752 break;
1754 }
1755 Err(_) => {
1756 self.incoming_events.push_error(NaiaServerError::RecvError);
1757 }
1758 }
1759 }
1760 }
1761
1762 loop {
1764 match self.io.recv_reader() {
1765 Ok(Some((address, owned_reader))) => {
1766 let mut reader = owned_reader.borrow();
1768
1769 let Ok(header) = StandardHeader::de(&mut reader) else {
1771 continue;
1774 };
1775
1776 match header.packet_type {
1777 PacketType::Data => {
1778 addresses.insert(address);
1779
1780 if self
1781 .read_data_packet(&address, &header, &mut reader)
1782 .is_err()
1783 {
1784 warn!("Server Error: cannot read malformed packet");
1785 continue;
1786 }
1787 }
1788 PacketType::Ping => {
1789 let response = self.time_manager.process_ping(&mut reader).unwrap();
1790 if self.io.send_packet(&address, response.to_packet()).is_err() {
1792 warn!("Server Error: Cannot send pong packet to {}", address);
1794 continue;
1795 };
1796
1797 if let Some(connection) = self.user_connections.get_mut(&address) {
1798 connection.process_incoming_header(&header);
1799 connection.base.mark_heard();
1800 }
1801
1802 continue;
1803 }
1804 PacketType::Heartbeat => {
1805 if let Some(connection) = self.user_connections.get_mut(&address) {
1806 connection.process_incoming_header(&header);
1807 connection.base.mark_heard();
1808 }
1809
1810 continue;
1811 }
1812 PacketType::Pong => {
1813 if let Some(connection) = self.user_connections.get_mut(&address) {
1814 connection.process_incoming_header(&header);
1815 connection.base.mark_heard();
1816 connection
1817 .ping_manager
1818 .process_pong(&self.time_manager, &mut reader);
1819 }
1820
1821 continue;
1822 }
1823 PacketType::Handshake => {
1824 match self.handshake_manager.maintain_handshake(
1825 &address,
1826 &mut reader,
1827 self.user_connections.contains_key(&address),
1828 ) {
1829 Ok(HandshakeAction::None) => {}
1830 Ok(HandshakeAction::FinalizeConnection(
1831 user_key,
1832 validate_packet,
1833 )) => {
1834 self.finalize_connection(&user_key, &address);
1835 if self.io.send_packet(&address, validate_packet).is_err() {
1836 warn!(
1838 "Server Error: Cannot send validation packet to {}",
1839 &address
1840 );
1841 }
1842 }
1843 Ok(HandshakeAction::SendPacket(packet)) => {
1844 if self.io.send_packet(&address, packet).is_err() {
1845 warn!("Server Error: Cannot send packet to {}", &address);
1847 }
1848 }
1849 Ok(HandshakeAction::DisconnectUser(user_key)) => {
1850 self.user_disconnect(&user_key, &mut world);
1851 }
1852 Err(_err) => {
1853 warn!("Server Error: cannot read malformed packet");
1854 }
1855 }
1856 }
1857 }
1858 }
1859 Ok(None) => {
1860 break;
1862 }
1863 Err(error) => {
1864 self.incoming_events
1865 .push_error(NaiaServerError::Wrapped(Box::new(error)));
1866 }
1867 }
1868 }
1869
1870 for address in addresses {
1871 self.process_packets(&address, &mut world, now);
1872 }
1873 }
1874
1875 fn read_data_packet(
1876 &mut self,
1877 address: &SocketAddr,
1878 header: &StandardHeader,
1879 reader: &mut BitReader,
1880 ) -> Result<(), SerdeErr> {
1881 if header.packet_type != PacketType::Data {
1882 panic!("Server Error: received non-data packet in data packet handler");
1883 }
1884
1885 let Some(connection) = self.user_connections.get_mut(address) else {
1887 return Ok(());
1888 };
1889
1890 connection.base.mark_heard();
1892
1893 connection.base.mark_should_send_empty_ack();
1895
1896 connection.process_incoming_header(header);
1898
1899 let client_tick = Tick::de(reader)?;
1901
1902 let server_tick = self.time_manager.current_tick();
1903
1904 connection.read_packet(
1906 &self.protocol,
1907 server_tick,
1908 client_tick,
1909 reader,
1910 &mut self.global_world_manager,
1911 )?;
1912
1913 return Ok(());
1914 }
1915
1916 fn process_packets<W: WorldMutType<E>>(
1917 &mut self,
1918 address: &SocketAddr,
1919 world: &mut W,
1920 now: &Instant,
1921 ) {
1922 let (user_key, response_events) = {
1924 let Some(connection) = self.user_connections.get_mut(address) else {
1925 return;
1926 };
1927 (
1928 connection.user_key,
1929 connection.process_packets(
1930 &self.protocol,
1931 now,
1932 &mut self.global_world_manager,
1933 &mut self.global_request_manager,
1934 &mut self.global_response_manager,
1935 world,
1936 &mut self.incoming_events,
1937 ),
1938 )
1939 };
1940 self.process_response_events(world, &user_key, response_events);
1941 }
1942
1943 fn process_response_events<W: WorldMutType<E>>(
1944 &mut self,
1945 world: &mut W,
1946 user_key: &UserKey,
1947 response_events: Vec<EntityResponseEvent<E>>,
1948 ) {
1949 let mut deferred_events = Vec::new();
1950 for response_event in response_events {
1951 match response_event {
1952 EntityResponseEvent::SpawnEntity(entity) => {
1953 self.global_world_manager
1954 .spawn_entity_record(&entity, EntityOwner::Client(*user_key));
1955 let user = self.users.get(user_key).unwrap();
1956 if !user.has_address() {
1957 continue;
1958 }
1959 let connection = self.user_connections.get_mut(&user.address()).unwrap();
1960 let local_entity = connection
1961 .base
1962 .local_world_manager
1963 .entity_to_remote_entity(&entity)
1964 .unwrap();
1965 connection
1966 .base
1967 .remote_world_manager
1968 .on_entity_channel_opened(&local_entity);
1969 }
1970 EntityResponseEvent::InsertComponent(entity, component_kind) => {
1971 self.global_world_manager
1972 .insert_component_record(&entity, &component_kind);
1973 if self
1974 .global_world_manager
1975 .entity_is_public_and_client_owned(&entity)
1976 || self.global_world_manager.entity_is_delegated(&entity)
1977 {
1978 world.component_publish(
1979 &self.global_world_manager,
1980 &entity,
1981 &component_kind,
1982 );
1983
1984 if self.global_world_manager.entity_is_delegated(&entity) {
1985 world.component_enable_delegation(
1986 &self.global_world_manager,
1987 &entity,
1988 &component_kind,
1989 );
1990
1991 let user = self.users.get(user_key).unwrap();
1993 if !user.has_address() {
1994 continue;
1995 }
1996 let addr = user.address();
1997 let connection = self.user_connections.get_mut(&addr).unwrap();
1998 connection
1999 .base
2000 .host_world_manager
2001 .track_remote_component(&entity, &component_kind);
2002 }
2003
2004 self.insert_new_component_into_entity_scopes(
2005 &entity,
2006 &component_kind,
2007 Some(user_key),
2008 );
2009 }
2010 }
2011 EntityResponseEvent::RemoveComponent(entity, component_kind) => {
2012 if self
2013 .global_world_manager
2014 .entity_is_public_and_client_owned(&entity)
2015 || self.global_world_manager.entity_is_delegated(&entity)
2016 {
2017 self.remove_component_worldless(&entity, &component_kind);
2018 } else {
2019 self.global_world_manager
2020 .remove_component_record(&entity, &component_kind);
2021 }
2022 }
2023 _ => {
2024 deferred_events.push(response_event);
2025 }
2026 }
2027 }
2028
2029 let mut extra_deferred_events = Vec::new();
2030 for response_event in deferred_events {
2032 match response_event {
2033 EntityResponseEvent::PublishEntity(entity) => {
2034 info!("received publish entity message!");
2035 self.publish_entity(world, &entity, false);
2036 self.incoming_events.push_publish(user_key, &entity);
2037 }
2038 EntityResponseEvent::UnpublishEntity(entity) => {
2039 self.unpublish_entity(world, &entity, false);
2040 self.incoming_events.push_unpublish(user_key, &entity);
2041 }
2042 EntityResponseEvent::EnableDelegationEntity(entity) => {
2043 info!("received enable delegation entity message!");
2044 self.entity_enable_delegation(world, &entity, Some(*user_key));
2045 self.incoming_events.push_delegate(user_key, &entity);
2046 }
2047 EntityResponseEvent::EnableDelegationEntityResponse(entity) => {
2048 self.entity_enable_delegation_response(user_key, &entity);
2049 }
2050 EntityResponseEvent::DisableDelegationEntity(_) => {
2051 panic!("Clients should not be able to disable entity delegation.");
2052 }
2053 EntityResponseEvent::EntityRequestAuthority(world_entity, remote_entity) => {
2054 self.client_request_authority(user_key, &world_entity, &remote_entity);
2055 }
2056 EntityResponseEvent::EntityReleaseAuthority(entity) => {
2057 self.entity_release_authority(Some(user_key), &entity);
2059 self.incoming_events.push_auth_reset(&entity);
2060 }
2061 EntityResponseEvent::EntityUpdateAuthority(_, _) => {
2062 panic!("Clients should not be able to update entity authority.");
2063 }
2064 EntityResponseEvent::EntityMigrateResponse(_, _) => {
2065 panic!("Clients should not be able to send this message");
2066 }
2067 _ => {
2068 extra_deferred_events.push(response_event);
2069 }
2070 }
2071 }
2072
2073 for response_event in extra_deferred_events {
2074 match response_event {
2075 EntityResponseEvent::DespawnEntity(entity) => {
2076 if self
2077 .global_world_manager
2078 .entity_is_public_and_client_owned(&entity)
2079 || self.global_world_manager.entity_is_delegated(&entity)
2080 {
2081 let user = self.users.get(user_key).unwrap();
2083 if !user.has_address() {
2084 continue;
2085 }
2086 let connection = self.user_connections.get_mut(&user.address()).unwrap();
2087 connection
2088 .base
2089 .host_world_manager
2090 .client_initiated_despawn(&entity);
2091
2092 self.despawn_entity_worldless(&entity);
2093 } else {
2094 self.global_world_manager.remove_entity_record(&entity);
2095 }
2096 }
2097 _ => {
2098 panic!("shouldn't happen");
2099 }
2100 }
2101 }
2102 }
2103
2104 fn handle_disconnects<W: WorldMutType<E>>(&mut self, world: &mut W) {
2105 if self.timeout_timer.ringing() {
2107 self.timeout_timer.reset();
2108
2109 let mut user_disconnects: Vec<UserKey> = Vec::new();
2110
2111 for (_, connection) in &mut self.user_connections.iter_mut() {
2112 if connection.base.should_drop() || connection.manual_disconnect {
2114 user_disconnects.push(connection.user_key);
2115 continue;
2116 }
2117 }
2118
2119 for user_key in user_disconnects {
2120 self.user_disconnect(&user_key, world);
2121 }
2122 }
2123 }
2124
2125 fn handle_heartbeats(&mut self) {
2126 if self.heartbeat_timer.ringing() {
2128 self.heartbeat_timer.reset();
2129
2130 for (user_address, connection) in &mut self.user_connections.iter_mut() {
2131 if connection.base.should_send_heartbeat() {
2133 Self::send_heartbeat_packet(
2134 user_address,
2135 connection,
2136 &self.time_manager,
2137 &mut self.io,
2138 );
2139 }
2140 }
2141 }
2142 }
2143
2144 fn handle_empty_acks(&mut self) {
2145 for (user_address, connection) in &mut self.user_connections.iter_mut() {
2148 if connection.base.should_send_empty_ack() {
2149 Self::send_heartbeat_packet(
2150 user_address,
2151 connection,
2152 &self.time_manager,
2153 &mut self.io,
2154 );
2155 }
2156 }
2157 }
2158
2159 fn send_heartbeat_packet(
2160 user_address: &SocketAddr,
2161 connection: &mut Connection<E>,
2162 time_manager: &TimeManager,
2163 io: &mut Io,
2164 ) {
2165 let mut writer = BitWriter::new();
2168
2169 connection
2171 .base
2172 .write_header(PacketType::Heartbeat, &mut writer);
2173
2174 time_manager.current_tick().ser(&mut writer);
2176
2177 time_manager.current_tick_instant().ser(&mut writer);
2179
2180 if io.send_packet(user_address, writer.to_packet()).is_err() {
2182 warn!(
2184 "Server Error: Cannot send heartbeat packet to {}",
2185 user_address
2186 );
2187 }
2188 connection.base.mark_sent();
2189 }
2190
2191 fn handle_pings(&mut self) {
2192 if self.ping_timer.ringing() {
2194 self.ping_timer.reset();
2195
2196 for (user_address, connection) in &mut self.user_connections.iter_mut() {
2197 if connection.ping_manager.should_send_ping() {
2199 let mut writer = BitWriter::new();
2200
2201 connection.base.write_header(PacketType::Ping, &mut writer);
2203
2204 self.time_manager.current_tick().ser(&mut writer);
2206
2207 self.time_manager.current_tick_instant().ser(&mut writer);
2209
2210 connection
2212 .ping_manager
2213 .write_ping(&mut writer, &self.time_manager);
2214
2215 if self
2217 .io
2218 .send_packet(user_address, writer.to_packet())
2219 .is_err()
2220 {
2221 warn!("Server Error: Cannot send ping packet to {}", user_address);
2223 }
2224 connection.base.mark_sent();
2225 }
2226 }
2227 }
2228 }
2229
2230 fn update_entity_scopes<W: WorldRefType<E>>(&mut self, world: &W) {
2233 for (_, room) in self.rooms.iter_mut() {
2234 while let Some((removed_user, removed_entity)) = room.pop_entity_removal_queue() {
2235 let Some(user) = self.users.get(&removed_user) else {
2236 continue;
2237 };
2238 if !user.has_address() {
2239 continue;
2240 }
2241 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
2242 continue;
2243 };
2244
2245 if let Some(entity_rooms) = self.entity_room_map.entity_get_rooms(&removed_entity) {
2248 let user_rooms = user.room_keys();
2249 let has_room_in_common = entity_rooms.intersection(user_rooms).next().is_some();
2250 if has_room_in_common {
2251 continue;
2252 }
2253 }
2254
2255 if connection
2257 .base
2258 .host_world_manager
2259 .host_has_entity(&removed_entity)
2260 {
2261 connection
2263 .base
2264 .host_world_manager
2265 .despawn_entity(&removed_entity);
2266 }
2267 }
2268 }
2269
2270 for (_, room) in self.rooms.iter_mut() {
2271 for user_key in room.user_keys() {
2274 let Some(user) = self.users.get(user_key) else {
2275 continue;
2276 };
2277 if !user.has_address() {
2278 continue;
2279 }
2280 let Some(connection) = self.user_connections.get_mut(&user.address()) else {
2281 continue;
2282 };
2283 for entity in room.entities() {
2284 if !world.has_entity(entity) {
2285 continue;
2286 }
2287 if self
2288 .global_world_manager
2289 .entity_is_public_and_owned_by_user(user_key, entity)
2290 {
2291 continue;
2293 }
2294
2295 let currently_in_scope =
2296 connection.base.host_world_manager.host_has_entity(entity);
2297
2298 let should_be_in_scope =
2299 if let Some(in_scope) = self.entity_scope_map.get(user_key, entity) {
2300 *in_scope
2301 } else {
2302 false
2303 };
2304
2305 if should_be_in_scope {
2306 if currently_in_scope {
2307 continue;
2308 }
2309 let component_kinds =
2310 self.global_world_manager.component_kinds(entity).unwrap();
2311 connection.base.host_world_manager.init_entity(
2313 &mut connection.base.local_world_manager,
2314 entity,
2315 component_kinds,
2316 );
2317
2318 if !self.global_world_manager.entity_is_delegated(entity) {
2320 continue;
2321 }
2322 let event_message = EntityEventMessage::new_enable_delegation(
2323 &self.global_world_manager,
2324 entity,
2325 );
2326 let mut converter = EntityConverterMut::new(
2327 &self.global_world_manager,
2328 &mut connection.base.local_world_manager,
2329 );
2330 let channel_kind = ChannelKind::of::<SystemChannel>();
2331 let message =
2332 MessageContainer::from_write(Box::new(event_message), &mut converter);
2333 connection.base.message_manager.send_message(
2334 &self.protocol.message_kinds,
2335 &mut converter,
2336 &channel_kind,
2337 message,
2338 );
2339 } else if currently_in_scope {
2340 connection.base.host_world_manager.despawn_entity(entity);
2342 }
2343 }
2344 }
2345 }
2346 }
2347}
2348
2349impl<E: Copy + Eq + Hash + Send + Sync> EntityAndGlobalEntityConverter<E> for Server<E> {
2350 fn global_entity_to_entity(
2351 &self,
2352 global_entity: &GlobalEntity,
2353 ) -> Result<E, EntityDoesNotExistError> {
2354 self.global_world_manager
2355 .global_entity_to_entity(global_entity)
2356 }
2357
2358 fn entity_to_global_entity(&self, entity: &E) -> Result<GlobalEntity, EntityDoesNotExistError> {
2359 self.global_world_manager.entity_to_global_entity(entity)
2360 }
2361}