1use std::{
2 collections::{HashMap, HashSet},
3 hash::Hash,
4};
5
6use log::warn;
7
8use naia_socket_shared::Instant;
9
10use crate::{
11 world::{
12 entity::in_scope_entities::InScopeEntities,
13 entity_event::EntityEvent,
14 host::host_world_manager::CommandId,
15 local::local_entity::RemoteEntity,
16 remote::{
17 remote_entity_waitlist::{RemoteEntityWaitlist, WaitlistStore},
18 remote_world_waitlist::RemoteWorldWaitlist,
19 },
20 sync::{RemoteEngine, RemoteEntityChannel},
21 },
22 ComponentKind, ComponentKinds, ComponentUpdate, EntityAndGlobalEntityConverter,
23 EntityAuthStatus, EntityCommand, EntityMessage, EntityMessageReceiver, GlobalEntity,
24 GlobalEntitySpawner, GlobalWorldManagerType, HostType, LocalEntityAndGlobalEntityConverter,
25 LocalEntityMap, MessageIndex, OwnedLocalEntity, Replicate, Tick, WorldMutType,
26};
27
28cfg_if! {
29 if #[cfg(feature = "e2e_debug")] {
30 use crate::world::{
31 host::host_world_manager::SubCommandId,
32 sync::remote_entity_channel::EntityChannelState,
33 };
34 use crate::EntityMessageType;
35 }
36}
37
38pub struct RemoteWorldManager {
40 remote_engine: RemoteEngine<RemoteEntity>,
43
44 authed_entities_opt: Option<HashSet<RemoteEntity>>,
47
48 incoming_events: Vec<EntityEvent>,
50 waitlist: RemoteWorldWaitlist,
51 }
53
54impl RemoteWorldManager {
55 pub fn new(host_type: HostType) -> Self {
57 let delegated_world_opt = if host_type == HostType::Client {
58 Some(HashSet::new())
59 } else {
60 None
61 };
62 Self {
63 remote_engine: RemoteEngine::new(host_type),
64 authed_entities_opt: delegated_world_opt,
65 incoming_events: Vec::new(),
66 waitlist: RemoteWorldWaitlist::new(),
67 }
68 }
69
70 pub(crate) fn deliver_message(
71 &mut self,
72 _command_id: CommandId,
73 _message: EntityMessage<RemoteEntity>,
74 ) {
75 }
77
78 pub(crate) fn entity_waitlist_queue<T>(
79 &mut self,
80 remote_entity_set: &HashSet<RemoteEntity>,
81 waitlist_store: &mut WaitlistStore<T>,
82 message: T,
83 ) {
84 self.waitlist.entity_waitlist_mut().queue(
85 &self.remote_engine,
86 remote_entity_set,
87 waitlist_store,
88 message,
89 );
90 }
91
92 pub fn entity_waitlist(&self) -> &RemoteEntityWaitlist {
94 self.waitlist.entity_waitlist()
95 }
96
97 pub fn entity_waitlist_mut(&mut self) -> &mut RemoteEntityWaitlist {
99 self.waitlist.entity_waitlist_mut()
100 }
101
102 pub(crate) fn register_authed_entity(&mut self, remote_entity: &RemoteEntity) {
103 let Some(authed_entities) = self.authed_entities_opt.as_mut() else {
104 return;
105 };
106
107 authed_entities.insert(*remote_entity);
108 }
109
110 #[cfg(feature = "e2e_debug")]
111 pub fn debug_channel_diagnostic(
112 &self,
113 remote_entity: &RemoteEntity,
114 ) -> Option<(
115 EntityChannelState,
116 (SubCommandId, usize, Option<SubCommandId>, usize),
117 )> {
118 self.remote_engine
119 .get_world()
120 .get(remote_entity)
121 .map(|channel| channel.debug_auth_diagnostic())
122 }
123
124 #[cfg(feature = "e2e_debug")]
125 pub fn debug_channel_snapshot(
126 &self,
127 remote_entity: &RemoteEntity,
128 ) -> Option<(
129 EntityChannelState,
130 Option<MessageIndex>,
131 usize,
132 Option<(MessageIndex, EntityMessageType)>,
133 Option<MessageIndex>,
134 )> {
135 self.remote_engine
136 .get_world()
137 .get(remote_entity)
138 .map(|channel| channel.debug_channel_snapshot())
139 }
140
141 pub(crate) fn deregister_authed_entity(&mut self, remote_entity: &RemoteEntity) {
142 let Some(authed_entities) = self.authed_entities_opt.as_mut() else {
143 return;
144 };
145
146 authed_entities.remove(remote_entity);
147 }
148
149 pub(crate) fn is_component_updatable(
150 &self,
151 local_converter: &dyn LocalEntityAndGlobalEntityConverter,
152 global_entity: &GlobalEntity,
153 kind: &ComponentKind,
154 ) -> bool {
155 let Some(authed_entities) = self.authed_entities_opt.as_ref() else {
156 return false;
157 };
158 let Ok(remote_entity) = local_converter.global_entity_to_remote_entity(global_entity) else {
159 return false;
160 };
161 if !authed_entities.contains(&remote_entity) {
162 return false;
163 }
164 let Some(remote_channel) = self.remote_engine.get_world().get(&remote_entity) else {
165 return false;
166 };
167 remote_channel.has_component_kind(kind)
168 }
169
170 pub fn take_outgoing_commands(&mut self) -> Vec<EntityCommand> {
172 self.remote_engine.take_outgoing_commands()
173 }
174
175 pub fn send_entity_command(
177 &mut self,
178 converter: &dyn LocalEntityAndGlobalEntityConverter,
179 command: EntityCommand,
180 ) {
181 let global_entity = command.entity();
182 let Ok(remote_entity) = converter.global_entity_to_remote_entity(&global_entity) else {
185 warn!(
186 "send_entity_command: entity {:?} no longer exists (likely out of scope), skipping",
187 global_entity
188 );
189 return;
190 };
191 self.remote_engine
192 .send_entity_command(remote_entity, command);
193 }
194
195 pub(crate) fn send_auth_command(
196 &mut self,
197 converter: &dyn LocalEntityAndGlobalEntityConverter,
198 command: EntityCommand,
199 ) {
200 let global_entity = command.entity();
201 let Ok(remote_entity) = converter.global_entity_to_remote_entity(&global_entity) else {
204 warn!(
205 "send_auth_command: entity {:?} no longer exists (likely out of scope), skipping",
206 global_entity
207 );
208 return;
209 };
210 self.remote_engine.send_auth_command(remote_entity, command);
211 }
212
213 pub(crate) fn receive_set_auth_status(
215 &mut self,
216 remote_entity: RemoteEntity,
217 auth_status: EntityAuthStatus,
218 ) {
219 self.remote_engine
220 .receive_set_auth_status(remote_entity, auth_status);
221 }
222
223 pub fn spawn_entity(
225 &mut self,
226 entity: &RemoteEntity,
228 ) {
229 self.waitlist.spawn_entity(&self.remote_engine, entity);
230 }
231
232 pub fn despawn_entity(
234 &mut self,
235 _local_entity_map: &mut LocalEntityMap,
236 entity: &RemoteEntity,
237 ) {
238 self.waitlist.despawn_entity(entity);
239 }
240
241 #[allow(clippy::too_many_arguments)]
243 pub fn take_incoming_events<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
244 &mut self,
245 spawner: &mut dyn GlobalEntitySpawner<E>,
246 global_world_manager: &dyn GlobalWorldManagerType,
247 local_entity_map: &mut LocalEntityMap,
248 component_kinds: &ComponentKinds,
249 world: &mut W,
250 now: &Instant,
251 incoming_components: &mut HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
252 incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
253 incoming_messages: Vec<(MessageIndex, EntityMessage<RemoteEntity>)>,
254 ) -> Vec<EntityEvent> {
255 let incoming_messages = EntityMessageReceiver::remote_take_incoming_messages(
256 &mut self.remote_engine,
257 incoming_messages,
258 );
259
260 self.process_updates(
261 local_entity_map.entity_converter(),
262 spawner.to_converter(),
263 component_kinds,
264 world,
265 now,
266 incoming_updates,
267 );
268 self.process_incoming_messages(
269 spawner,
270 global_world_manager,
271 local_entity_map,
272 world,
273 now,
274 incoming_components,
275 incoming_messages,
276 );
277
278 std::mem::take(&mut self.incoming_events)
279 }
280
281 #[allow(clippy::too_many_arguments)]
282 fn process_incoming_messages<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
283 &mut self,
284 spawner: &mut dyn GlobalEntitySpawner<E>,
285 global_world_manager: &dyn GlobalWorldManagerType,
286 local_entity_map: &mut LocalEntityMap,
287 world: &mut W,
288 now: &Instant,
289 incoming_components: &mut HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
290 incoming_messages: Vec<EntityMessage<RemoteEntity>>,
291 ) {
292 self.process_ready_messages(
293 spawner,
294 global_world_manager,
295 local_entity_map,
296 world,
297 incoming_components,
298 incoming_messages,
299 );
300 let world_converter = spawner.to_converter();
301 self.process_waitlist_messages(
302 local_entity_map.entity_converter(),
303 world_converter,
304 world,
305 now,
306 );
307 }
308
309 fn process_ready_messages<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
312 &mut self,
313 spawner: &mut dyn GlobalEntitySpawner<E>,
314 global_world_manager: &dyn GlobalWorldManagerType,
315 local_entity_map: &mut LocalEntityMap,
316 world: &mut W,
317 incoming_components: &mut HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
318 incoming_messages: Vec<EntityMessage<RemoteEntity>>,
319 ) {
320 for message in incoming_messages {
322 match message {
324 EntityMessage::Spawn(remote_entity) => {
325 let world_entity = world.spawn_entity();
327 let global_entity = spawner.spawn(world_entity, Some(remote_entity));
328 if local_entity_map.contains_remote_entity(&remote_entity) {
329 } else {
331 local_entity_map.insert_with_remote_entity(global_entity, remote_entity);
332 }
333
334 self.incoming_events.push(EntityEvent::Spawn(global_entity));
335 }
336 EntityMessage::Despawn(remote_entity) => {
337 let global_entity = local_entity_map.remove_by_remote_entity(&remote_entity);
338 let world_entity = spawner.global_entity_to_entity(&global_entity).unwrap();
339
340 if let Some(component_kinds) =
343 global_world_manager.component_kinds(&global_entity)
344 {
345 for component_kind in component_kinds {
346 self.process_remove(
347 world,
348 local_entity_map,
349 &remote_entity,
350 &world_entity,
351 &component_kind,
352 );
353 }
354 }
355
356 world.despawn_entity(&world_entity);
357
358 self.incoming_events
359 .push(EntityEvent::Despawn(global_entity));
360 }
361 EntityMessage::InsertComponent(remote_entity, component_kind) => {
362 let local_entity = remote_entity.copy_to_owned();
363 let component = incoming_components
364 .remove(&(local_entity, component_kind))
365 .unwrap();
366
367 if local_entity_map.contains_remote_entity(&remote_entity) {
368 let global_entity = *local_entity_map
369 .global_entity_from_remote(&remote_entity)
370 .unwrap();
371 let world_entity = spawner.global_entity_to_entity(&global_entity).unwrap();
372
373 self.process_insert(
374 world,
375 local_entity_map,
376 &remote_entity,
377 &world_entity,
378 component,
379 &component_kind,
380 );
381 } else {
382 warn!("received InsertComponent message for nonexistant entity");
384 }
385 }
386 EntityMessage::RemoveComponent(remote_entity, component_kind) => {
387 let global_entity = local_entity_map
388 .global_entity_from_remote(&remote_entity)
389 .unwrap();
390 let world_entity = spawner.global_entity_to_entity(global_entity).unwrap();
391 self.process_remove(
392 world,
393 local_entity_map,
394 &remote_entity,
395 &world_entity,
396 &component_kind,
397 );
398 }
399 EntityMessage::Noop => {
400 }
402 EntityMessage::SetAuthority(_, remote_entity, auth_status) => {
403 self.remote_engine.receive_set_auth_status(remote_entity, auth_status);
405 let Some(global_entity) = local_entity_map.global_entity_from_remote(&remote_entity) else {
406 continue;
407 };
408 self.incoming_events.push(EntityEvent::SetAuthority(*global_entity, auth_status));
409 }
410 msg => {
411 let event = msg.to_event(local_entity_map);
413 self.incoming_events.push(event);
414 }
415 }
416 }
417 }
418
419 fn process_insert<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
420 &mut self,
421 world: &mut W,
422 converter: &dyn LocalEntityAndGlobalEntityConverter,
423 entity: &RemoteEntity,
424 world_entity: &E,
425 component: Box<dyn Replicate>,
426 component_kind: &ComponentKind,
427 ) {
428 if let Some(remote_entity_set) = component.relations_waiting() {
429
430 self.waitlist.waitlist_queue_entity(
431 &self.remote_engine,
432 entity,
433 component,
434 component_kind,
435 &remote_entity_set,
436 );
437 } else {
438 self.finish_insert(
439 world,
440 converter,
441 entity,
442 world_entity,
443 component,
444 component_kind,
445 );
446 }
447 }
448
449 fn finish_insert<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
450 &mut self,
451 world: &mut W,
452 converter: &dyn LocalEntityAndGlobalEntityConverter,
453 entity: &RemoteEntity,
454 world_entity: &E,
455 component: Box<dyn Replicate>,
456 component_kind: &ComponentKind,
457 ) {
458 world.insert_boxed_component(world_entity, component);
465
466 let global_entity = converter.remote_entity_to_global_entity(entity).unwrap();
467
468 self.incoming_events
469 .push(EntityEvent::InsertComponent(global_entity, *component_kind));
470 }
471
472 fn process_remove<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
473 &mut self,
474 world: &mut W,
475 converter: &dyn LocalEntityAndGlobalEntityConverter,
476 entity: &RemoteEntity,
477 world_entity: &E,
478 component_kind: &ComponentKind,
479 ) {
480 if self.waitlist.process_remove(entity, component_kind) {
481 return;
482 }
483 if let Some(component) = world.remove_component_of_kind(world_entity, component_kind) {
485 if let Ok(global_entity) = converter.remote_entity_to_global_entity(entity) {
487 self.incoming_events
488 .push(EntityEvent::RemoveComponent(global_entity, component));
489 }
490 }
491 }
492
493 fn process_waitlist_messages<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
494 &mut self,
495 local_converter: &dyn LocalEntityAndGlobalEntityConverter,
496 world_converter: &dyn EntityAndGlobalEntityConverter<E>,
497 world: &mut W,
498 now: &Instant,
499 ) {
500 for (entity, component_kind, component) in
501 self.waitlist.entities_to_insert(now, local_converter)
502 {
503 let global_entity = local_converter
504 .remote_entity_to_global_entity(&entity)
505 .unwrap();
506 let world_entity = world_converter
507 .global_entity_to_entity(&global_entity)
508 .unwrap();
509 self.finish_insert(
510 world,
511 local_converter,
512 &entity,
513 &world_entity,
514 component,
515 &component_kind,
516 );
517 }
518 }
519
520 fn process_updates<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
521 &mut self,
522 local_converter: &dyn LocalEntityAndGlobalEntityConverter,
523 world_converter: &dyn EntityAndGlobalEntityConverter<E>,
524 component_kinds: &ComponentKinds,
525 world: &mut W,
526 now: &Instant,
527 incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
528 ) {
529 self.process_ready_updates(
530 local_converter,
531 world_converter,
532 component_kinds,
533 world,
534 incoming_updates,
535 );
536 self.process_waitlist_updates(local_converter, world_converter, world, now);
537 }
538
539 fn process_ready_updates<WE: Copy + Eq + Hash + Send + Sync, W: WorldMutType<WE>>(
541 &mut self,
542 local_converter: &dyn LocalEntityAndGlobalEntityConverter,
543 world_converter: &dyn EntityAndGlobalEntityConverter<WE>,
544 component_kinds: &ComponentKinds,
545 world: &mut W,
546 incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
547 ) {
548 for (tick, local_entity, component_kind) in self.waitlist.process_ready_updates(
549 &self.remote_engine,
550 local_converter,
551 world_converter,
552 component_kinds,
553 world,
554 incoming_updates,
555 ) {
556 let global_entity = local_converter
557 .owned_entity_to_global_entity(&local_entity)
558 .unwrap();
559 self.incoming_events.push(EntityEvent::UpdateComponent(
560 tick,
561 global_entity,
562 component_kind,
563 ));
564 }
565 }
566
567 fn process_waitlist_updates<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
568 &mut self,
569 local_converter: &dyn LocalEntityAndGlobalEntityConverter,
570 world_converter: &dyn EntityAndGlobalEntityConverter<E>,
571 world: &mut W,
572 now: &Instant,
573 ) {
574 for (tick, remote_entity, component_kind) in
575 self.waitlist
576 .process_waitlist_updates(local_converter, world_converter, world, now)
577 {
578 let global_entity = local_converter
579 .remote_entity_to_global_entity(&remote_entity)
580 .unwrap();
581 self.incoming_events.push(EntityEvent::UpdateComponent(
582 tick,
583 global_entity,
584 component_kind,
585 ));
586 }
587 }
588
589 pub(crate) fn force_drain_entity_buffers(&mut self, remote_entity: &RemoteEntity) {
590 let Some(channel) = self.remote_engine.get_world_mut().get_mut(remote_entity) else {
591 panic!("Cannot force-drain non-existent entity");
592 };
593 channel.force_drain_all_buffers();
594 }
595
596 pub(crate) fn extract_component_kinds(
597 &self,
598 remote_entity: &RemoteEntity,
599 ) -> HashSet<ComponentKind> {
600 let Some(channel) = self.remote_engine.get_world().get(remote_entity) else {
601 panic!("Cannot extract component kinds from non-existent entity");
602 };
603 channel.extract_inserted_component_kinds()
604 }
605
606 pub(crate) fn remove_entity_channel(
607 &mut self,
608 remote_entity: &RemoteEntity,
609 ) -> RemoteEntityChannel {
610 self.remote_engine.remove_entity_channel(remote_entity)
611 }
612
613 pub(crate) fn insert_entity_channel(
614 &mut self,
615 remote_entity: RemoteEntity,
616 channel: RemoteEntityChannel,
617 ) {
618 self.remote_engine
619 .insert_entity_channel(remote_entity, channel);
620 }
621
622 pub(crate) fn has_entity_channel(&self, remote_entity: &RemoteEntity) -> bool {
623 self.remote_engine.has_entity(remote_entity)
624 }
625
626 pub(crate) fn get_entity_channel_mut(
627 &mut self,
628 remote_entity: &RemoteEntity,
629 ) -> Option<&mut RemoteEntityChannel> {
630 self.remote_engine.get_entity_channel_mut(remote_entity)
631 }
632
633 pub fn get_entity_auth_status(&self, entity: &RemoteEntity) -> Option<EntityAuthStatus> {
635 self.remote_engine.get_entity_auth_status(entity)
636 }
637}
638
639impl InScopeEntities<RemoteEntity> for RemoteWorldManager {
640 fn has_entity(&self, entity: &RemoteEntity) -> bool {
641 self.remote_engine.has_entity(entity)
642 }
643}