1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 hash::Hash,
4 net::SocketAddr,
5 time::Duration,
6};
7
8#[cfg(feature = "bench_instrumentation")]
12pub mod bench_take_events_counters {
13 use std::sync::atomic::{AtomicU64, Ordering};
14 #[doc(hidden)] pub static NS_HOST_REMOTE_COMMANDS: AtomicU64 = AtomicU64::new(0);
15 #[doc(hidden)] pub static NS_SENDER_COLLECT: AtomicU64 = AtomicU64::new(0);
16 #[doc(hidden)] pub static NS_TAKE_UPDATE_EVENTS: AtomicU64 = AtomicU64::new(0);
17
18 pub fn reset() {
20 NS_HOST_REMOTE_COMMANDS.store(0, Ordering::Relaxed);
21 NS_SENDER_COLLECT.store(0, Ordering::Relaxed);
22 NS_TAKE_UPDATE_EVENTS.store(0, Ordering::Relaxed);
23 }
24 pub fn snapshot() -> (u64, u64, u64) {
26 (
27 NS_HOST_REMOTE_COMMANDS.load(Ordering::Relaxed),
28 NS_SENDER_COLLECT.load(Ordering::Relaxed),
29 NS_TAKE_UPDATE_EVENTS.load(Ordering::Relaxed),
30 )
31 }
32}
33
34#[cfg(feature = "bench_instrumentation")]
41pub mod cmd_emission_counters {
42 use std::sync::atomic::{AtomicU64, Ordering};
43
44 #[doc(hidden)] pub static SPAWN: AtomicU64 = AtomicU64::new(0);
45 #[doc(hidden)] pub static SPAWN_WITH_COMPONENTS: AtomicU64 = AtomicU64::new(0);
46 #[doc(hidden)] pub static DESPAWN: AtomicU64 = AtomicU64::new(0);
47 #[doc(hidden)] pub static INSERT_COMPONENT: AtomicU64 = AtomicU64::new(0);
48 #[doc(hidden)] pub static REMOVE_COMPONENT: AtomicU64 = AtomicU64::new(0);
49 #[doc(hidden)] pub static NOOP: AtomicU64 = AtomicU64::new(0);
50 #[doc(hidden)] pub static OTHER: AtomicU64 = AtomicU64::new(0);
51
52 pub static PAYLOAD_COMPONENTS: AtomicU64 = AtomicU64::new(0);
57
58 pub fn reset() {
60 SPAWN.store(0, Ordering::Relaxed);
61 SPAWN_WITH_COMPONENTS.store(0, Ordering::Relaxed);
62 DESPAWN.store(0, Ordering::Relaxed);
63 INSERT_COMPONENT.store(0, Ordering::Relaxed);
64 REMOVE_COMPONENT.store(0, Ordering::Relaxed);
65 NOOP.store(0, Ordering::Relaxed);
66 OTHER.store(0, Ordering::Relaxed);
67 PAYLOAD_COMPONENTS.store(0, Ordering::Relaxed);
68 }
69
70 #[derive(Debug, Clone, Copy)]
72 pub struct CmdEmissionSnapshot {
73 pub spawn: u64,
75 pub spawn_with_components: u64,
77 pub despawn: u64,
79 pub insert_component: u64,
81 pub remove_component: u64,
83 pub noop: u64,
85 pub other: u64,
87 pub payload_components: u64,
89 }
90
91 pub fn snapshot() -> CmdEmissionSnapshot {
93 CmdEmissionSnapshot {
94 spawn: SPAWN.load(Ordering::Relaxed),
95 spawn_with_components: SPAWN_WITH_COMPONENTS.load(Ordering::Relaxed),
96 despawn: DESPAWN.load(Ordering::Relaxed),
97 insert_component: INSERT_COMPONENT.load(Ordering::Relaxed),
98 remove_component: REMOVE_COMPONENT.load(Ordering::Relaxed),
99 noop: NOOP.load(Ordering::Relaxed),
100 other: OTHER.load(Ordering::Relaxed),
101 payload_components: PAYLOAD_COMPONENTS.load(Ordering::Relaxed),
102 }
103 }
104}
105
106use log::info;
107use naia_socket_shared::Instant;
108
109use crate::world::sync::RemoteEntityChannel;
110use crate::world::update::entity_update_manager::EntityUpdateManager;
111use crate::{
112 messages::channels::receivers::reliable_receiver::ReliableReceiver,
113 sequence_list::SequenceList,
114 types::{HostType, PacketIndex},
115 world::{
116 entity::entity_converters::GlobalWorldManagerType,
117 host::host_world_manager::{CommandId, HostWorldManager},
118 remote::remote_entity_waitlist::{RemoteEntityWaitlist, WaitlistStore},
119 sync::HostEntityChannel,
120 },
121 ChannelSender, ComponentKind, ComponentKinds, ComponentUpdate, DiffMask,
122 EntityAndGlobalEntityConverter, EntityAuthStatus, EntityCommand, EntityConverterMut,
123 EntityEvent, EntityMessage, EntityMessageType, GlobalEntity, GlobalEntitySpawner, HostEntity,
124 InScopeEntities, LocalEntityAndGlobalEntityConverter, LocalEntityMap, MessageIndex,
125 OwnedLocalEntity, PacketNotifiable, ReliableSender, RemoteEntity, RemoteWorldManager,
126 Replicate, Tick, WorldMutType, WorldRefType,
127};
128
129cfg_if! {
130 if #[cfg(feature = "e2e_debug")] {
131 use crate::world::{
132 host::host_world_manager::SubCommandId,
133 sync::remote_entity_channel::EntityChannelState,
134 };
135 }
136}
137
138const RESEND_COMMAND_RTT_FACTOR: f32 = 1.5;
139const COMMAND_RECORD_TTL: Duration = Duration::from_secs(60);
140
141type SentCommandPackets = SequenceList<(Instant, Vec<(CommandId, EntityMessage<OwnedLocalEntity>)>)>;
142type OutgoingEvents = (VecDeque<(CommandId, EntityCommand)>, HashMap<GlobalEntity, HashSet<ComponentKind>>);
143
144pub struct LocalWorldManager {
146 entity_map: LocalEntityMap,
147 sender: ReliableSender<EntityCommand>,
148 sent_command_packets: SentCommandPackets,
149 receiver: ReliableReceiver<EntityMessage<OwnedLocalEntity>>,
150
151 host: HostWorldManager,
152 remote: RemoteWorldManager,
153 updater: EntityUpdateManager,
154
155 paused_entities: HashSet<GlobalEntity>,
158
159 incoming_components: HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
161
162 incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
164}
165
166impl LocalWorldManager {
167 pub fn new(
169 address: &Option<SocketAddr>,
170 host_type: HostType,
171 user_key: u64,
172 global_world_manager: &dyn GlobalWorldManagerType,
173 ) -> Self {
174 Self {
175 entity_map: LocalEntityMap::new(host_type),
176 sender: ReliableSender::new(RESEND_COMMAND_RTT_FACTOR, None),
177 sent_command_packets: SequenceList::new(),
178 receiver: ReliableReceiver::new(),
179
180 host: HostWorldManager::new(host_type, user_key),
181 remote: RemoteWorldManager::new(host_type),
182 updater: EntityUpdateManager::new(address, global_world_manager),
183
184 paused_entities: HashSet::new(),
185
186 incoming_components: HashMap::new(),
187 incoming_updates: Vec::new(),
188 }
189 }
190
191 pub(crate) fn entity_waitlist_queue<T>(
192 &mut self,
193 remote_entity_set: &HashSet<RemoteEntity>,
194 waitlist_store: &mut WaitlistStore<T>,
195 message: T,
196 ) {
197 self.remote
198 .entity_waitlist_queue(remote_entity_set, waitlist_store, message);
199 }
200
201 pub fn entity_converter(&self) -> &dyn LocalEntityAndGlobalEntityConverter {
205 self.entity_map.entity_converter()
206 }
207
208 pub fn entity_converter_mut<'a, 'b>(
210 &'b mut self,
211 global_world_manager: &'a dyn GlobalWorldManagerType,
212 ) -> EntityConverterMut<'a, 'b> {
213 self.host
214 .entity_converter_mut(global_world_manager, &mut self.entity_map)
215 }
216
217 pub fn has_global_entity(&self, global_entity: &GlobalEntity) -> bool {
219 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
220 return false;
221 };
222 self.has_local_entity(&local_entity)
223 }
224
225 pub fn has_local_entity(&self, local_entity: &OwnedLocalEntity) -> bool {
227 match local_entity {
228 OwnedLocalEntity::Host { id, is_static: true } => {
229 self.host.has_entity(&HostEntity::new_static(*id))
230 }
231 OwnedLocalEntity::Host { id, is_static: false } => {
232 self.host.has_entity(&HostEntity::new(*id))
233 }
234 OwnedLocalEntity::Remote { id, is_static } => {
235 let remote = if *is_static { RemoteEntity::new_static(*id) } else { RemoteEntity::new(*id) };
236 self.remote.has_entity(&remote)
237 }
238 }
239 }
240
241 pub fn get_host_entity_channel(
243 &self,
244 entity: &HostEntity,
245 ) -> Option<&crate::world::sync::HostEntityChannel> {
246 self.host.get_entity_channel(entity)
247 }
248
249 pub fn get_host_entity_channel_mut(
251 &mut self,
252 entity: &HostEntity,
253 ) -> Option<&mut crate::world::sync::HostEntityChannel> {
254 self.host.get_entity_channel_mut(entity)
255 }
256
257 pub fn has_host_entity(&self, host_entity: &HostEntity) -> bool {
261 self.host.has_entity(host_entity)
262 }
263
264 pub fn host_init_entity(
266 &mut self,
267 global_entity: &GlobalEntity,
268 component_kinds: Vec<ComponentKind>,
269 component_kinds_map: &ComponentKinds,
270 is_static: bool,
271 ) {
272 if let Ok(existing_host_entity) =
281 self.entity_map.global_entity_to_host_entity(global_entity)
282 {
283 let channel_alive = self
284 .host
285 .get_host_world()
286 .contains_key(&existing_host_entity);
287 if !channel_alive {
288 self.entity_map.remove_by_global_entity(global_entity);
292 }
293 }
294
295 if self
296 .entity_map
297 .global_entity_to_host_entity(global_entity)
298 .is_err()
299 {
300 if is_static {
302 let host_entity = self.host.host_generate_static_entity();
303 self.entity_map
304 .insert_with_static_host_entity(*global_entity, host_entity);
305 } else {
306 let host_entity = self.host.host_generate_entity();
307 self.entity_map
308 .insert_with_host_entity(*global_entity, host_entity);
309 }
310 }
311
312 if is_static {
313 self.host.init_static_entity_send_host_commands(
314 &self.entity_map,
315 global_entity,
316 component_kinds,
317 );
318 } else {
319 self.host.init_entity_send_host_commands(
320 &self.entity_map,
321 global_entity,
322 component_kinds,
323 &mut self.updater,
324 component_kinds_map,
325 );
326 }
327 }
328
329 pub fn migrate_entity_remote_to_host(
351 &mut self,
352 global_entity: &GlobalEntity,
353 ) -> Result<HostEntity, String> {
354 let Some(local_entity_record) = self.entity_map.remove_by_global_entity(global_entity)
356 else {
357 return Err(format!(
358 "Entity does not exist in local entity map: {:?}",
359 global_entity
360 ));
361 };
362
363 if !local_entity_record.is_remote_owned() {
364 self.entity_map
366 .insert_with_remote_entity(*global_entity, local_entity_record.remote_entity());
367 return Err(format!("Entity is not remote-owned: {:?}", global_entity));
368 }
369 let old_remote_entity = local_entity_record.remote_entity();
370
371 let new_host_entity = self.host.host_generate_entity();
373
374 self.entity_map
375 .insert_with_host_entity(*global_entity, new_host_entity);
376
377 self.entity_map
382 .remove_remote_mapping_if_exists(&old_remote_entity);
383
384 debug_assert!(
387 self.entity_map
388 .entity_converter()
389 .global_entity_to_remote_entity(global_entity)
390 .is_err(),
391 "After migration, global_entity_to_remote_entity must fail for migrated entity"
392 );
393
394 self.remote.force_drain_entity_buffers(&old_remote_entity);
397
398 let component_kinds = self.remote.extract_component_kinds(&old_remote_entity);
401
402 let _old_remote_channel = self.remote.remove_entity_channel(&old_remote_entity);
405
406 let new_host_channel =
409 HostEntityChannel::new_with_components(self.entity_map.host_type(), component_kinds);
410
411 self.host
414 .insert_entity_channel(new_host_entity, new_host_channel);
415
416 let old_entity = old_remote_entity.copy_to_owned();
419 let new_entity = OwnedLocalEntity::Host { id: new_host_entity.value(), is_static: false };
420 self.entity_map
421 .install_entity_redirect(old_entity, new_entity);
422
423 self.update_sent_command_entity_refs(global_entity, old_entity, new_entity);
426
427 self.remote
430 .despawn_entity(&mut self.entity_map, &old_remote_entity);
431
432 Ok(new_host_entity)
433 }
434
435 pub fn host_send_enable_delegation(&mut self, global_entity: &GlobalEntity) {
437 let command = EntityCommand::EnableDelegation(None, *global_entity);
438 self.host.send_command(&self.entity_map, command);
439 }
440
441 pub fn host_local_enable_delegation(&mut self, host_entity: &HostEntity) {
443 let Some(channel) = self.host.get_entity_channel_mut(host_entity) else {
444 panic!(
445 "Cannot enable delegation on non-existent HostEntity: {:?}",
446 host_entity
447 );
448 };
449 channel.local_enable_delegation();
450 }
451
452 pub fn host_send_migrate_response(
454 &mut self,
455 global_entity: &GlobalEntity,
456 old_remote_entity: &RemoteEntity, new_host_entity: &HostEntity, ) {
459 let command = EntityCommand::MigrateResponse(
462 None,
463 *global_entity,
464 *old_remote_entity,
465 *new_host_entity,
466 );
467 self.host.send_command(&self.entity_map, command);
468 }
469
470 #[track_caller]
471 pub fn host_send_set_auth(
473 &mut self,
474 global_entity: &GlobalEntity,
475 auth_status: EntityAuthStatus,
476 ) {
477 #[cfg(feature = "e2e_debug")]
478 {
479 crate::e2e_trace!(
480 "[SERVER_SEND] SetAuthority entity={:?} status={:?}",
481 global_entity,
482 auth_status
483 );
484 }
485 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
486 panic!("Attempting to send SetAuthority for entity which does not exist in local entity map! {:?}", global_entity);
487 };
488
489 let command = EntityCommand::SetAuthority(None, *global_entity, auth_status);
490 if local_entity.is_host() {
491 self.host.send_command(&self.entity_map, command);
492 } else {
493 self.remote
495 .send_auth_command(self.entity_map.entity_converter(), command);
496 }
497 }
498
499 pub fn host_reserve_entity(&mut self, global_entity: &GlobalEntity) -> HostEntity {
501 self.host
502 .host_reserve_entity(&mut self.entity_map, global_entity)
503 }
504
505 pub fn host_remove_reserved_entity(
507 &mut self,
508 global_entity: &GlobalEntity,
509 ) -> Option<HostEntity> {
510 self.host.host_removed_reserved_entity(global_entity)
511 }
512
513 pub(crate) fn insert_sent_command_packet(&mut self, packet_index: &PacketIndex, now: Instant) {
514 if !self
515 .sent_command_packets
516 .contains_scan_from_back(packet_index)
517 {
518 self.sent_command_packets
519 .insert_scan_from_back(*packet_index, (now, Vec::new()));
520 }
521 }
522
523 pub(crate) fn record_command_written(
524 &mut self,
525 packet_index: &PacketIndex,
526 command_id: &CommandId,
527 message: EntityMessage<OwnedLocalEntity>,
528 ) {
529 #[cfg(feature = "bench_instrumentation")]
530 {
531 use std::sync::atomic::Ordering;
532 match &message {
533 EntityMessage::Spawn(_) => {
534 cmd_emission_counters::SPAWN.fetch_add(1, Ordering::Relaxed);
535 }
536 EntityMessage::SpawnWithComponents(_, kinds) => {
537 cmd_emission_counters::SPAWN_WITH_COMPONENTS
538 .fetch_add(1, Ordering::Relaxed);
539 cmd_emission_counters::PAYLOAD_COMPONENTS
540 .fetch_add(kinds.len() as u64, Ordering::Relaxed);
541 }
542 EntityMessage::Despawn(_) => {
543 cmd_emission_counters::DESPAWN.fetch_add(1, Ordering::Relaxed);
544 }
545 EntityMessage::InsertComponent(_, _) => {
546 cmd_emission_counters::INSERT_COMPONENT.fetch_add(1, Ordering::Relaxed);
547 }
548 EntityMessage::RemoveComponent(_, _) => {
549 cmd_emission_counters::REMOVE_COMPONENT.fetch_add(1, Ordering::Relaxed);
550 }
551 EntityMessage::Noop => {
552 cmd_emission_counters::NOOP.fetch_add(1, Ordering::Relaxed);
553 }
554 _ => {
555 cmd_emission_counters::OTHER.fetch_add(1, Ordering::Relaxed);
556 }
557 }
558 }
559
560 let (_, sent_actions_list) = self
561 .sent_command_packets
562 .get_mut_scan_from_back(packet_index)
563 .unwrap();
564 sent_actions_list.push((*command_id, message));
565 }
566
567 pub fn remote_entities(&self) -> Vec<GlobalEntity> {
571 self.entity_map.remote_entities()
572 }
573
574 #[cfg(feature = "e2e_debug")]
575 pub fn debug_remote_channel_diagnostic(
576 &self,
577 remote_entity: &RemoteEntity,
578 ) -> Option<(
579 EntityChannelState,
580 (SubCommandId, usize, Option<SubCommandId>, usize),
581 )> {
582 self.remote.debug_channel_diagnostic(remote_entity)
583 }
584
585 #[cfg(feature = "e2e_debug")]
586 pub fn debug_remote_channel_snapshot(
587 &self,
588 remote_entity: &RemoteEntity,
589 ) -> Option<(
590 EntityChannelState,
591 Option<MessageIndex>,
592 usize,
593 Option<(MessageIndex, EntityMessageType)>,
594 Option<MessageIndex>,
595 )> {
596 self.remote.debug_channel_snapshot(remote_entity)
597 }
598
599 pub fn send_enable_delegation_response(&mut self, global_entity: &GlobalEntity) {
601 let command = EntityCommand::EnableDelegationResponse(None, *global_entity);
602 self.remote.send_auth_command(&self.entity_map, command);
603 }
604
605 pub fn remote_send_request_auth(&mut self, global_entity: &GlobalEntity) {
607 let command = EntityCommand::RequestAuthority(None, *global_entity);
608 self.remote.send_auth_command(&self.entity_map, command);
609 }
610
611 pub fn remote_receive_set_auth(
613 &mut self,
614 global_entity: &GlobalEntity,
615 auth_status: EntityAuthStatus,
616 ) {
617 let remote_entity = self
618 .entity_map
619 .entity_converter()
620 .global_entity_to_remote_entity(global_entity)
621 .unwrap();
622 self.remote
623 .receive_set_auth_status(remote_entity, auth_status);
624 }
625
626 pub fn get_remote_entity_auth_status(
628 &self,
629 global_entity: &GlobalEntity,
630 ) -> Option<EntityAuthStatus> {
631 let Ok(owned) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
632 return None;
633 };
634 let OwnedLocalEntity::Remote { .. } = owned else {
635 return None;
636 };
637 self.remote
638 .get_entity_auth_status(&owned.take_remote())
639 }
640
641 pub fn entity_waitlist_mut(&mut self) -> &mut RemoteEntityWaitlist {
643 self.remote.entity_waitlist_mut()
644 }
645
646 pub fn receiver_buffer_message(
648 &mut self,
649 id: MessageIndex,
650 msg: EntityMessage<OwnedLocalEntity>,
651 ) {
652 self.receiver.buffer_message(id, msg);
662 }
663
664 pub(crate) fn insert_received_component(
665 &mut self,
666 local_entity: &OwnedLocalEntity,
667 component_kind: &ComponentKind,
668 component: Box<dyn Replicate>,
669 ) {
670 self.incoming_components
671 .insert((*local_entity, *component_kind), component);
672 }
673
674 pub(crate) fn insert_received_update(
675 &mut self,
676 tick: Tick,
677 local_entity: &OwnedLocalEntity,
678 component_update: ComponentUpdate,
679 ) {
680 self.incoming_updates
681 .push((tick, *local_entity, component_update));
682 }
683
684 pub fn take_incoming_events<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
686 &mut self,
687 spawner: &mut dyn GlobalEntitySpawner<E>,
688 global_world_manager: &dyn GlobalWorldManagerType,
689 component_kinds: &ComponentKinds,
690 world: &mut W,
691 now: &Instant,
692 ) -> Vec<EntityEvent> {
693 let incoming_messages = self.receiver.receive_messages();
694 let mut incoming_host_messages = Vec::new();
695 let mut incoming_remote_messages = Vec::new();
696
697 for (id, incoming_message) in incoming_messages {
698 if incoming_message.get_type() == EntityMessageType::Noop {
699 continue; }
701
702 let Some(local_entity) = incoming_message.entity() else {
710 panic!(
711 "Received message without an entity! Message: {:?}",
712 incoming_message
713 );
714 };
715 match local_entity {
716 OwnedLocalEntity::Host { id: host_entity, is_static } => {
717 let host_entity = if is_static { HostEntity::new_static(host_entity) } else { HostEntity::new(host_entity) };
719 incoming_host_messages.push((id, incoming_message.with_entity(host_entity)));
720 }
721 OwnedLocalEntity::Remote { .. } => {
722 let remote_entity = local_entity.take_remote();
724 #[cfg(feature = "e2e_debug")]
726 if incoming_message.get_type() == EntityMessageType::Spawn {
727 extern "Rust" {
728 fn client_routed_remote_spawn_increment();
729 }
730 unsafe {
733 client_routed_remote_spawn_increment();
734 }
735 }
736 incoming_remote_messages
737 .push((id, incoming_message.with_entity(remote_entity)));
738 }
739 }
740 }
741
742 let host_events = self.host.take_incoming_events(
743 spawner,
744 global_world_manager,
745 &self.entity_map,
746 world,
747 incoming_host_messages,
748 );
749 let mut remote_events = self.remote.take_incoming_events(
750 spawner,
751 global_world_manager,
752 &mut self.entity_map,
753 component_kinds,
754 world,
755 now,
756 &mut self.incoming_components,
757 std::mem::take(&mut self.incoming_updates),
758 incoming_remote_messages,
759 );
760
761 let mut incoming_events = host_events;
762 incoming_events.append(&mut remote_events);
763
764 incoming_events
765 }
766
767 pub fn register_authed_entity(
769 &mut self,
770 global_manager: &dyn GlobalWorldManagerType,
771 global_entity: &GlobalEntity,
772 ) {
773 if let Ok(remote_entity) = self
776 .entity_map
777 .global_entity_to_remote_entity(global_entity)
778 {
779 self.remote.register_authed_entity(&remote_entity);
780 }
781
782 let Some(component_kinds) = global_manager.component_kinds(global_entity) else {
783 return;
785 };
786
787 for component_kind in component_kinds.iter() {
788 self.updater
789 .register_component(global_entity, component_kind);
790 }
791 }
792
793 pub fn deregister_authed_entity(
795 &mut self,
796 global_manager: &dyn GlobalWorldManagerType,
797 global_entity: &GlobalEntity,
798 ) {
799 if let Ok(remote_entity) = self
802 .entity_map
803 .global_entity_to_remote_entity(global_entity)
804 {
805 self.remote.deregister_authed_entity(&remote_entity);
806 }
807
808 let Some(component_kinds) = global_manager.component_kinds(global_entity) else {
809 return;
811 };
812
813 for component_kind in component_kinds.iter() {
814 self.updater
815 .deregister_component(global_entity, component_kind);
816 }
817 }
818
819 pub fn remote_spawn_entity(&mut self, global_entity: &GlobalEntity) {
821 let remote_entity = self
822 .entity_map
823 .global_entity_to_remote_entity(global_entity)
824 .unwrap();
825 self.remote.spawn_entity(&remote_entity);
826 }
827
828 pub fn remote_despawn_entity(&mut self, global_entity: &GlobalEntity) {
830 let remote_entity = self
831 .entity_map
832 .global_entity_to_remote_entity(global_entity)
833 .unwrap();
834 self.remote
835 .despawn_entity(&mut self.entity_map, &remote_entity);
836 }
837
838 pub(crate) fn get_diff_mask(
841 &self,
842 global_entity: &GlobalEntity,
843 component_kind: &ComponentKind,
844 ) -> DiffMask {
845 self.updater.get_diff_mask(global_entity, component_kind)
846 }
847
848 pub(crate) fn record_update(
849 &mut self,
850 now: &Instant,
851 packet_index: &PacketIndex,
852 global_entity: &GlobalEntity,
853 component_kind: &ComponentKind,
854 diff_mask: DiffMask,
855 ) {
856 self.updater
857 .record_update(now, packet_index, global_entity, component_kind, diff_mask);
858 }
859
860 pub fn despawn_entity(&mut self, global_entity: &GlobalEntity) {
864 self.paused_entities.remove(global_entity);
866
867 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
868 panic!(
869 "Attempting to despawn entity which does not exist in local entity map! {:?}",
870 global_entity
871 );
872 };
873 if local_entity.is_host() {
874 self.host
875 .send_command(&self.entity_map, EntityCommand::Despawn(*global_entity));
876 } else {
877 self.remote
878 .send_entity_command(&self.entity_map, EntityCommand::Despawn(*global_entity));
879 }
880 }
881
882 pub fn pause_entity(&mut self, global_entity: &GlobalEntity) {
886 self.paused_entities.insert(*global_entity);
887 }
888
889 pub fn resume_entity(&mut self, global_entity: &GlobalEntity) {
892 self.paused_entities.remove(global_entity);
893 }
894
895 pub fn is_entity_paused(&self, global_entity: &GlobalEntity) -> bool {
897 self.paused_entities.contains(global_entity)
898 }
899
900 pub fn insert_component(
902 &mut self,
903 global_entity: &GlobalEntity,
904 component_kind: &ComponentKind,
905 ) {
906 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
907 panic!("Attempting to insert component for entity which does not exist in local entity map! {:?}", global_entity);
908 };
909 if local_entity.is_host() {
910 self.updater
913 .register_component(global_entity, component_kind);
914 self.host.send_command(
915 &self.entity_map,
916 EntityCommand::InsertComponent(*global_entity, *component_kind),
917 );
918 } else {
919 self.remote.send_entity_command(
920 &self.entity_map,
921 EntityCommand::InsertComponent(*global_entity, *component_kind),
922 );
923 }
924 }
925
926 pub fn remove_component(
928 &mut self,
929 global_entity: &GlobalEntity,
930 component_kind: &ComponentKind,
931 ) {
932 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
933 panic!("Attempting to remove component for entity which does not exist in local entity map! {:?}", global_entity);
934 };
935 if local_entity.is_host() {
936 self.host.send_command(
937 &self.entity_map,
938 EntityCommand::RemoveComponent(*global_entity, *component_kind),
939 );
940 } else {
941 self.remote.send_entity_command(
942 &self.entity_map,
943 EntityCommand::RemoveComponent(*global_entity, *component_kind),
944 );
945 }
946 }
947
948 pub fn send_publish(&mut self, host_type: HostType, global_entity: &GlobalEntity) {
950 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
951 panic!(
952 "Attempting to publish entity which does not exist in local entity map! {:?}",
953 global_entity
954 );
955 };
956 let host_owned = match (host_type, local_entity.is_host()) {
957 (HostType::Server, true) => {
958 panic!("Server-owned Entities are published by default, invalid!")
959 }
960 (HostType::Client, false) => {
961 panic!("Server-owned Entities are published by default, invalid!")
962 }
963 (HostType::Server, false) => false, (HostType::Client, true) => true, };
966
967 let command = EntityCommand::Publish(None, *global_entity);
968 if host_owned {
969 self.host.send_command(&self.entity_map, command);
970 } else {
971 self.remote
972 .send_auth_command(self.entity_map.entity_converter(), command);
973 }
974 }
975
976 pub fn send_unpublish(&mut self, host_type: HostType, global_entity: &GlobalEntity) {
978 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
979 panic!(
980 "Attempting to publish entity which does not exist in local entity map! {:?}",
981 global_entity
982 );
983 };
984 let host_owned = match (host_type, local_entity.is_host()) {
985 (HostType::Server, true) => panic!("Server-owned Entities cannot be unpublished!"),
986 (HostType::Client, false) => panic!("Server-owned Entities cannot be unpublished!"),
987 (HostType::Server, false) => false, (HostType::Client, true) => true, };
990 let command = EntityCommand::Unpublish(None, *global_entity);
991 if host_owned {
992 self.host.send_command(&self.entity_map, command);
993 } else {
994 self.remote
995 .send_auth_command(self.entity_map.entity_converter(), command);
996 }
997 }
998
999 pub fn send_enable_delegation(
1001 &mut self,
1002 host_type: HostType,
1003 origin_is_owning_client: bool,
1004 global_entity: &GlobalEntity,
1005 ) {
1006 let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
1011 panic!("Attempting to enable delegation for entity which does not exist in local entity map! {:?}", global_entity);
1012 };
1013 let host_owned = match (host_type, local_entity.is_host(), origin_is_owning_client) {
1014 (HostType::Server, false, true) => {
1015 panic!("Client cannot originate enable delegation for ANOTHER client-owned entity!")
1016 }
1017 (HostType::Client, _, false) => {
1018 panic!("Client must be the owning client to enable delegation!")
1019 }
1020 (HostType::Client, false, true) => {
1021 panic!("Client cannot enable delegation for a Server-owned entity")
1022 }
1023
1024 (HostType::Server, true, true) => true, (HostType::Server, true, false) => true, (HostType::Client, true, true) => true, (HostType::Server, false, false) => false, };
1029
1030 if host_owned {
1031 let host_entity = self
1033 .entity_map
1034 .global_entity_to_host_entity(global_entity)
1035 .expect("Host entity should exist");
1036
1037 let is_published = if let Some(channel) = self.get_host_entity_channel(&host_entity) {
1038 use crate::world::sync::auth_channel::EntityAuthChannelState;
1039 let state = channel.auth_channel_state();
1040 state == EntityAuthChannelState::Published
1041 || state == EntityAuthChannelState::Delegated
1042 } else {
1043 false
1044 };
1045
1046 if !is_published {
1048 let publish_command = EntityCommand::Publish(None, *global_entity);
1049 self.host.send_command(&self.entity_map, publish_command);
1050 }
1051
1052 #[cfg(feature = "e2e_debug")]
1054 crate::e2e_trace!(
1055 "[SERVER_SEND] EnableDelegation entity={:?} callsite=send_enable_delegation(host)",
1056 global_entity
1057 );
1058 let enable_delegation_command = EntityCommand::EnableDelegation(None, *global_entity);
1059 self.host
1060 .send_command(&self.entity_map, enable_delegation_command);
1061 } else {
1062 #[cfg(feature = "e2e_debug")]
1063 crate::e2e_trace!("[SERVER_SEND] EnableDelegation entity={:?} callsite=send_enable_delegation(remote)", global_entity);
1064 let command = EntityCommand::EnableDelegation(None, *global_entity);
1065 self.remote
1066 .send_auth_command(self.entity_map.entity_converter(), command);
1067 }
1068 }
1069
1070 #[track_caller]
1071 pub fn send_disable_delegation(&mut self, global_entity: &GlobalEntity) {
1073 #[cfg(feature = "e2e_debug")]
1074 {
1075 let caller = std::panic::Location::caller();
1076 crate::e2e_trace!(
1077 "[SERVER_SEND] DisableDelegation entity={:?} caller={}:{}",
1078 global_entity,
1079 caller.file(),
1080 caller.line()
1081 );
1082 }
1083 let command = EntityCommand::DisableDelegation(None, *global_entity);
1085 self.host.send_command(&self.entity_map, command);
1086 }
1087
1088 pub fn remote_send_release_auth(&mut self, global_entity: &GlobalEntity) {
1090 let command = EntityCommand::ReleaseAuthority(None, *global_entity);
1091
1092 let host_owned = self
1093 .entity_map
1094 .global_entity_to_owned_entity(global_entity)
1095 .unwrap()
1096 .is_host();
1097 if host_owned {
1098 self.host.send_command(&self.entity_map, command);
1099 } else {
1100 self.remote
1101 .send_auth_command(self.entity_map.entity_converter(), command);
1102 }
1103 }
1104
1105 pub fn collect_messages(&mut self, now: &Instant, rtt_millis: &f32) {
1109 self.handle_dropped_command_packets(now);
1110 self.updater.handle_dropped_update_packets(now, rtt_millis);
1111 }
1112
1113 fn handle_dropped_command_packets(&mut self, now: &Instant) {
1114 while let Some((_, (time_sent, _))) = self.sent_command_packets.front() {
1115 if time_sent.elapsed(now) > COMMAND_RECORD_TTL {
1116 self.sent_command_packets.pop_front();
1117 } else {
1118 break;
1119 }
1120 }
1121
1122 self.entity_map.cleanup_old_redirects(now, 60);
1124 }
1125
1126 pub fn take_outgoing_events<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
1128 &mut self,
1129 now: &Instant,
1130 rtt_millis: &f32,
1131 world: &W,
1132 converter: &dyn EntityAndGlobalEntityConverter<E>,
1133 global_world_manager: &dyn GlobalWorldManagerType,
1134 ) -> OutgoingEvents {
1135 #[cfg(feature = "bench_instrumentation")]
1137 let t = std::time::Instant::now();
1138 let host_commands = self.host.take_outgoing_commands();
1139 let remote_commands = self.remote.take_outgoing_commands();
1140 for commands in [host_commands, remote_commands] {
1141 for command in commands {
1142 self.sender.send_message(command);
1143 }
1144 }
1145 #[cfg(feature = "bench_instrumentation")]
1146 {
1147 use std::sync::atomic::Ordering;
1148 bench_take_events_counters::NS_HOST_REMOTE_COMMANDS
1149 .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
1150 }
1151
1152 #[cfg(feature = "bench_instrumentation")]
1153 let t = std::time::Instant::now();
1154 self.sender.collect_messages(now, rtt_millis);
1155 let world_commands = self.sender.take_next_messages();
1156 #[cfg(feature = "bench_instrumentation")]
1157 {
1158 use std::sync::atomic::Ordering;
1159 bench_take_events_counters::NS_SENDER_COLLECT
1160 .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
1161 }
1162
1163 #[cfg(feature = "bench_instrumentation")]
1165 let t = std::time::Instant::now();
1166 let update_events = self.take_update_events(world, converter, global_world_manager);
1167 #[cfg(feature = "bench_instrumentation")]
1168 {
1169 use std::sync::atomic::Ordering;
1170 bench_take_events_counters::NS_TAKE_UPDATE_EVENTS
1171 .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
1172 }
1173
1174 (world_commands, update_events)
1176 }
1177
1178 pub fn process_delivered_commands(&mut self) {
1180 self.host
1181 .process_delivered_commands(&mut self.entity_map, &mut self.updater);
1182 }
1183
1184 pub fn take_update_events<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
1186 &mut self,
1187 world: &W,
1188 world_converter: &dyn EntityAndGlobalEntityConverter<E>,
1189 global_world_manager: &dyn GlobalWorldManagerType,
1190 ) -> HashMap<GlobalEntity, HashSet<ComponentKind>> {
1191 let dirty = self.updater.build_dirty_candidates_from_receivers();
1192 let local_converter = self.entity_map.entity_converter();
1193 let mut updatable_world: HashMap<GlobalEntity, HashSet<ComponentKind>> = HashMap::new();
1194 for (global_entity, kinds) in dirty {
1195 if self.paused_entities.contains(&global_entity) {
1196 continue;
1197 }
1198 for kind in kinds {
1199 if self.host.is_component_updatable(local_converter, &global_entity, &kind)
1200 || self.remote.is_component_updatable(local_converter, &global_entity, &kind)
1201 {
1202 updatable_world.entry(global_entity).or_default().insert(kind);
1203 }
1204 }
1205 }
1206 self.updater.take_outgoing_events(world, world_converter, global_world_manager, updatable_world)
1207 }
1208
1209 pub fn get_message_processor_helpers(
1221 &mut self,
1222 ) -> (
1223 &dyn LocalEntityAndGlobalEntityConverter,
1224 &mut RemoteEntityWaitlist,
1225 ) {
1226 let entity_converter = self.entity_map.entity_converter();
1227 let entity_waitlist = self.remote.entity_waitlist_mut();
1228 (entity_converter, entity_waitlist)
1229 }
1230
1231 fn host_notify_packet_delivered(&mut self, packet_index: PacketIndex) {
1232 if let Some((_, command_list)) = self
1233 .sent_command_packets
1234 .remove_scan_from_front(&packet_index)
1235 {
1236 for (command_id, command) in command_list {
1237 if self.sender.deliver_message(&command_id).is_some() {
1238 self.deliver_message(command_id, command);
1239 }
1240 }
1241 }
1242 }
1243
1244 fn deliver_message(&mut self, id: CommandId, msg: EntityMessage<OwnedLocalEntity>) {
1245 if msg.is_noop() {
1246 return;
1247 }
1248 let Some(local_entity) = msg.entity() else {
1249 panic!("Delivered message without an entity! Message: {:?}", msg);
1250 };
1251 match local_entity {
1252 OwnedLocalEntity::Host { id: host_entity, is_static } => {
1253 let host_entity = if is_static { HostEntity::new_static(host_entity) } else { HostEntity::new(host_entity) };
1255 self.host.deliver_message(id, msg.with_entity(host_entity));
1256 }
1257 OwnedLocalEntity::Remote { .. } => {
1258 let remote_entity = local_entity.take_remote();
1260 self.remote
1261 .deliver_message(id, msg.with_entity(remote_entity));
1262 }
1263 }
1264 }
1265
1266 pub fn update_sent_command_entity_refs(
1268 &mut self,
1269 _global_entity: &GlobalEntity,
1270 old_entity: OwnedLocalEntity,
1271 new_entity: OwnedLocalEntity,
1272 ) {
1273 for (_, (_, commands)) in self.sent_command_packets.iter_mut() {
1275 for (_, message) in commands.iter_mut() {
1276 if let Some(entity) = message.entity() {
1277 if entity == old_entity {
1278 *message = message.clone().with_entity(new_entity);
1279 }
1280 }
1281 }
1282 }
1283 }
1284
1285 pub fn extract_host_entity_commands(
1287 &mut self,
1288 global_entity: &GlobalEntity,
1289 ) -> Vec<EntityCommand> {
1290 let host_entity = self
1292 .entity_map
1293 .global_entity_to_host_entity(global_entity)
1294 .unwrap();
1295 self.host.extract_entity_commands(&host_entity)
1297 }
1298
1299 pub fn extract_host_component_kinds(
1301 &self,
1302 global_entity: &GlobalEntity,
1303 ) -> HashSet<ComponentKind> {
1304 let host_entity = self
1306 .entity_map
1307 .global_entity_to_host_entity(global_entity)
1308 .unwrap();
1309 let channel = self.host.get_entity_channel(&host_entity).unwrap();
1311 channel.component_kinds().clone()
1313 }
1314
1315 pub fn remove_host_entity(&mut self, global_entity: &GlobalEntity) {
1317 let host_entity = self
1319 .entity_map
1320 .global_entity_to_host_entity(global_entity)
1321 .unwrap();
1322 self.host.remove_entity_channel(&host_entity);
1324 self.entity_map.remove_by_global_entity(global_entity);
1326 }
1327
1328 pub fn insert_remote_entity(
1330 &mut self,
1331 global_entity: &GlobalEntity,
1332 remote_entity: RemoteEntity,
1333 component_kinds: HashSet<ComponentKind>,
1334 ) {
1335 self.entity_map
1337 .insert_with_remote_entity(*global_entity, remote_entity);
1338
1339 if self.remote.has_entity_channel(&remote_entity) {
1340 info!(
1343 "RemoteEntity({:?}) channel already exists (likely from out-of-order SetAuthority). Upgrading to Delegated.",
1344 remote_entity
1345 );
1346 let channel = self.remote.get_entity_channel_mut(&remote_entity).unwrap();
1347
1348 channel.configure_as_delegated();
1350
1351 channel.set_spawned(0);
1354
1355 for component_kind in component_kinds {
1357 channel.insert_component_channel_as_inserted(component_kind, 0);
1358 }
1359 } else {
1360 let mut channel = RemoteEntityChannel::new_delegated(self.entity_map.host_type());
1362
1363 channel.set_spawned(0);
1365
1366 for component_kind in component_kinds {
1368 channel.insert_component_channel_as_inserted(component_kind, 0);
1369 }
1370
1371 self.remote.insert_entity_channel(remote_entity, channel);
1373 }
1374 }
1375
1376 pub fn install_entity_redirect(&mut self, old: OwnedLocalEntity, new: OwnedLocalEntity) {
1378 self.entity_map.install_entity_redirect(old, new);
1379 }
1380
1381 pub fn apply_entity_redirect(&self, entity: OwnedLocalEntity) -> OwnedLocalEntity {
1383 self.entity_map.apply_entity_redirect(&entity)
1384 }
1385
1386 pub fn replay_entity_command(&mut self, global_entity: &GlobalEntity, command: EntityCommand) {
1388 let _remote_entity = self
1390 .entity_map
1391 .global_entity_to_remote_entity(global_entity)
1392 .unwrap();
1393 self.remote.send_entity_command(&self.entity_map, command);
1394 }
1395}
1396
1397impl PacketNotifiable for LocalWorldManager {
1398 fn notify_packet_delivered(&mut self, packet_index: PacketIndex) {
1399 self.host_notify_packet_delivered(packet_index);
1400 self.updater.notify_packet_delivered(packet_index);
1401 }
1402}
1403
1404cfg_if! {
1405 if #[cfg(feature = "interior_visibility")] {
1406
1407 use crate::LocalEntity;
1408
1409 impl LocalWorldManager {
1410
1411 pub fn local_entities(&self) -> Vec<LocalEntity> {
1413 self.entity_map
1414 .iter()
1415 .map(|(_, record)| LocalEntity::from(record.owned_entity()))
1416 .collect::<Vec<LocalEntity>>()
1417 }
1418 }
1419 }
1420}
1421
1422#[cfg(feature = "test_utils")]
1423impl LocalWorldManager {
1424 #[doc(hidden)]
1425 pub fn diff_handler_receiver_count(&self) -> usize {
1426 self.updater.diff_handler_receiver_count()
1427 }
1428
1429 #[doc(hidden)]
1430 pub fn dirty_update_count(&self) -> usize {
1431 self.updater.dirty_candidates_len()
1432 }
1433}