Skip to main content

naia_shared/world/remote/
remote_world_manager.rs

1use std::{
2    collections::{HashMap, HashSet},
3    hash::Hash,
4};
5
6use log::warn;
7
8use naia_socket_shared::Instant;
9
10use crate::{
11    world::{
12        entity::in_scope_entities::InScopeEntities,
13        entity_event::EntityEvent,
14        host::host_world_manager::CommandId,
15        local::local_entity::RemoteEntity,
16        remote::{
17            remote_entity_waitlist::{RemoteEntityWaitlist, WaitlistStore},
18            remote_world_waitlist::RemoteWorldWaitlist,
19        },
20        sync::{RemoteEngine, RemoteEntityChannel},
21    },
22    ComponentKind, ComponentKinds, ComponentUpdate, EntityAndGlobalEntityConverter,
23    EntityAuthStatus, EntityCommand, EntityMessage, EntityMessageReceiver, GlobalEntity,
24    GlobalEntitySpawner, GlobalWorldManagerType, HostType, LocalEntityAndGlobalEntityConverter,
25    LocalEntityMap, MessageIndex, OwnedLocalEntity, Replicate, Tick, WorldMutType,
26};
27
28cfg_if! {
29    if #[cfg(feature = "e2e_debug")] {
30        use crate::world::{
31            host::host_world_manager::SubCommandId,
32            sync::remote_entity_channel::EntityChannelState,
33        };
34        use crate::EntityMessageType;
35    }
36}
37
38/// Manages the inbound side of entity replication — entities whose authoritative state comes from the remote peer.
39pub struct RemoteWorldManager {
40    // For Server, this contains the Entities that have been received from the Client, that the Client has authority over.
41    // For Client, this contains the Entities that have been received from the Server, that the Server has authority over.
42    remote_engine: RemoteEngine<RemoteEntity>,
43
44    // For Server, this is None
45    // For Client, it reflects the delegated RemoteEntities it has temporary authority over
46    authed_entities_opt: Option<HashSet<RemoteEntity>>,
47
48    // incoming messages
49    incoming_events: Vec<EntityEvent>,
50    waitlist: RemoteWorldWaitlist,
51    // outgoing messages
52}
53
54impl RemoteWorldManager {
55    /// Creates a `RemoteWorldManager` for the given `host_type` side of a connection.
56    pub fn new(host_type: HostType) -> Self {
57        let delegated_world_opt = if host_type == HostType::Client {
58            Some(HashSet::new())
59        } else {
60            None
61        };
62        Self {
63            remote_engine: RemoteEngine::new(host_type),
64            authed_entities_opt: delegated_world_opt,
65            incoming_events: Vec::new(),
66            waitlist: RemoteWorldWaitlist::new(),
67        }
68    }
69
70    pub(crate) fn deliver_message(
71        &mut self,
72        _command_id: CommandId,
73        _message: EntityMessage<RemoteEntity>,
74    ) {
75        // so far, it seems like we don't need to do anything specific when delivering a remote-entity message.. we'll see
76    }
77
78    pub(crate) fn entity_waitlist_queue<T>(
79        &mut self,
80        remote_entity_set: &HashSet<RemoteEntity>,
81        waitlist_store: &mut WaitlistStore<T>,
82        message: T,
83    ) {
84        self.waitlist.entity_waitlist_mut().queue(
85            &self.remote_engine,
86            remote_entity_set,
87            waitlist_store,
88            message,
89        );
90    }
91
92    /// Returns a shared reference to the entity waitlist.
93    pub fn entity_waitlist(&self) -> &RemoteEntityWaitlist {
94        self.waitlist.entity_waitlist()
95    }
96
97    /// Returns a mutable reference to the entity waitlist.
98    pub fn entity_waitlist_mut(&mut self) -> &mut RemoteEntityWaitlist {
99        self.waitlist.entity_waitlist_mut()
100    }
101
102    pub(crate) fn register_authed_entity(&mut self, remote_entity: &RemoteEntity) {
103        let Some(authed_entities) = self.authed_entities_opt.as_mut() else {
104            return;
105        };
106
107        authed_entities.insert(*remote_entity);
108    }
109
110    #[cfg(feature = "e2e_debug")]
111    pub fn debug_channel_diagnostic(
112        &self,
113        remote_entity: &RemoteEntity,
114    ) -> Option<(
115        EntityChannelState,
116        (SubCommandId, usize, Option<SubCommandId>, usize),
117    )> {
118        self.remote_engine
119            .get_world()
120            .get(remote_entity)
121            .map(|channel| channel.debug_auth_diagnostic())
122    }
123
124    #[cfg(feature = "e2e_debug")]
125    pub fn debug_channel_snapshot(
126        &self,
127        remote_entity: &RemoteEntity,
128    ) -> Option<(
129        EntityChannelState,
130        Option<MessageIndex>,
131        usize,
132        Option<(MessageIndex, EntityMessageType)>,
133        Option<MessageIndex>,
134    )> {
135        self.remote_engine
136            .get_world()
137            .get(remote_entity)
138            .map(|channel| channel.debug_channel_snapshot())
139    }
140
141    pub(crate) fn deregister_authed_entity(&mut self, remote_entity: &RemoteEntity) {
142        let Some(authed_entities) = self.authed_entities_opt.as_mut() else {
143            return;
144        };
145
146        authed_entities.remove(remote_entity);
147    }
148
149    pub(crate) fn is_component_updatable(
150        &self,
151        local_converter: &dyn LocalEntityAndGlobalEntityConverter,
152        global_entity: &GlobalEntity,
153        kind: &ComponentKind,
154    ) -> bool {
155        let Some(authed_entities) = self.authed_entities_opt.as_ref() else {
156            return false;
157        };
158        let Ok(remote_entity) = local_converter.global_entity_to_remote_entity(global_entity) else {
159            return false;
160        };
161        if !authed_entities.contains(&remote_entity) {
162            return false;
163        }
164        let Some(remote_channel) = self.remote_engine.get_world().get(&remote_entity) else {
165            return false;
166        };
167        remote_channel.has_component_kind(kind)
168    }
169
170    /// Drains and returns all pending outbound [`EntityCommand`]s from the remote engine.
171    pub fn take_outgoing_commands(&mut self) -> Vec<EntityCommand> {
172        self.remote_engine.take_outgoing_commands()
173    }
174
175    /// Enqueues `command` for the entity identified in `command` via the remote engine, silently skipping if the entity no longer exists.
176    pub fn send_entity_command(
177        &mut self,
178        converter: &dyn LocalEntityAndGlobalEntityConverter,
179        command: EntityCommand,
180    ) {
181        let global_entity = command.entity();
182        // Entity may no longer exist if it went out of scope before this command
183        // was processed. In that case, the command is no longer relevant - silently skip.
184        let Ok(remote_entity) = converter.global_entity_to_remote_entity(&global_entity) else {
185            warn!(
186                "send_entity_command: entity {:?} no longer exists (likely out of scope), skipping",
187                global_entity
188            );
189            return;
190        };
191        self.remote_engine
192            .send_entity_command(remote_entity, command);
193    }
194
195    pub(crate) fn send_auth_command(
196        &mut self,
197        converter: &dyn LocalEntityAndGlobalEntityConverter,
198        command: EntityCommand,
199    ) {
200        let global_entity = command.entity();
201        // Entity may no longer exist if it went out of scope before this auth command
202        // was processed. In that case, the command is no longer relevant - silently skip.
203        let Ok(remote_entity) = converter.global_entity_to_remote_entity(&global_entity) else {
204            warn!(
205                "send_auth_command: entity {:?} no longer exists (likely out of scope), skipping",
206                global_entity
207            );
208            return;
209        };
210        self.remote_engine.send_auth_command(remote_entity, command);
211    }
212
213    /// Update authority status in RemoteEntityChannel (used after migration)
214    pub(crate) fn receive_set_auth_status(
215        &mut self,
216        remote_entity: RemoteEntity,
217        auth_status: EntityAuthStatus,
218    ) {
219        self.remote_engine
220            .receive_set_auth_status(remote_entity, auth_status);
221    }
222
223    /// Notifies the waitlist that `entity` has been spawned, unblocking any queued operations.
224    pub fn spawn_entity(
225        &mut self,
226        // converter: &dyn LocalEntityAndGlobalEntityConverter,
227        entity: &RemoteEntity,
228    ) {
229        self.waitlist.spawn_entity(&self.remote_engine, entity);
230    }
231
232    /// Removes `entity` from the waitlist tracking structures.
233    pub fn despawn_entity(
234        &mut self,
235        _local_entity_map: &mut LocalEntityMap,
236        entity: &RemoteEntity,
237    ) {
238        self.waitlist.despawn_entity(entity);
239    }
240
241    /// Processes all buffered incoming messages and updates, applying them to `world` and returning the resulting [`EntityEvent`]s.
242    #[allow(clippy::too_many_arguments)]
243    pub fn take_incoming_events<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
244        &mut self,
245        spawner: &mut dyn GlobalEntitySpawner<E>,
246        global_world_manager: &dyn GlobalWorldManagerType,
247        local_entity_map: &mut LocalEntityMap,
248        component_kinds: &ComponentKinds,
249        world: &mut W,
250        now: &Instant,
251        incoming_components: &mut HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
252        incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
253        incoming_messages: Vec<(MessageIndex, EntityMessage<RemoteEntity>)>,
254    ) -> Vec<EntityEvent> {
255        let incoming_messages = EntityMessageReceiver::remote_take_incoming_messages(
256            &mut self.remote_engine,
257            incoming_messages,
258        );
259
260        self.process_updates(
261            local_entity_map.entity_converter(),
262            spawner.to_converter(),
263            component_kinds,
264            world,
265            now,
266            incoming_updates,
267        );
268        self.process_incoming_messages(
269            spawner,
270            global_world_manager,
271            local_entity_map,
272            world,
273            now,
274            incoming_components,
275            incoming_messages,
276        );
277
278        std::mem::take(&mut self.incoming_events)
279    }
280
281    #[allow(clippy::too_many_arguments)]
282    fn process_incoming_messages<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
283        &mut self,
284        spawner: &mut dyn GlobalEntitySpawner<E>,
285        global_world_manager: &dyn GlobalWorldManagerType,
286        local_entity_map: &mut LocalEntityMap,
287        world: &mut W,
288        now: &Instant,
289        incoming_components: &mut HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
290        incoming_messages: Vec<EntityMessage<RemoteEntity>>,
291    ) {
292        self.process_ready_messages(
293            spawner,
294            global_world_manager,
295            local_entity_map,
296            world,
297            incoming_components,
298            incoming_messages,
299        );
300        let world_converter = spawner.to_converter();
301        self.process_waitlist_messages(
302            local_entity_map.entity_converter(),
303            world_converter,
304            world,
305            now,
306        );
307    }
308
309    /// For each [`EntityMessage`] that can be executed now,
310    /// execute it and emit a corresponding event.
311    fn process_ready_messages<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
312        &mut self,
313        spawner: &mut dyn GlobalEntitySpawner<E>,
314        global_world_manager: &dyn GlobalWorldManagerType,
315        local_entity_map: &mut LocalEntityMap,
316        world: &mut W,
317        incoming_components: &mut HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
318        incoming_messages: Vec<EntityMessage<RemoteEntity>>,
319    ) {
320        // execute the action and emit an event
321        for message in incoming_messages {
322            // info!("Processing EntityMessage: {:?}", message);
323            match message {
324                EntityMessage::Spawn(remote_entity) => {
325                    // set up entity
326                    let world_entity = world.spawn_entity();
327                    let global_entity = spawner.spawn(world_entity, Some(remote_entity));
328                    if local_entity_map.contains_remote_entity(&remote_entity) {
329                        // mapped remote entity already when reserving global entity
330                    } else {
331                        local_entity_map.insert_with_remote_entity(global_entity, remote_entity);
332                    }
333
334                    self.incoming_events.push(EntityEvent::Spawn(global_entity));
335                }
336                EntityMessage::Despawn(remote_entity) => {
337                    let global_entity = local_entity_map.remove_by_remote_entity(&remote_entity);
338                    let world_entity = spawner.global_entity_to_entity(&global_entity).unwrap();
339
340                    // Generate event for each component, handing references off just in
341                    // case
342                    if let Some(component_kinds) =
343                        global_world_manager.component_kinds(&global_entity)
344                    {
345                        for component_kind in component_kinds {
346                            self.process_remove(
347                                world,
348                                local_entity_map,
349                                &remote_entity,
350                                &world_entity,
351                                &component_kind,
352                            );
353                        }
354                    }
355
356                    world.despawn_entity(&world_entity);
357
358                    self.incoming_events
359                        .push(EntityEvent::Despawn(global_entity));
360                }
361                EntityMessage::InsertComponent(remote_entity, component_kind) => {
362                    let local_entity = remote_entity.copy_to_owned();
363                    let component = incoming_components
364                        .remove(&(local_entity, component_kind))
365                        .unwrap();
366
367                    if local_entity_map.contains_remote_entity(&remote_entity) {
368                        let global_entity = *local_entity_map
369                            .global_entity_from_remote(&remote_entity)
370                            .unwrap();
371                        let world_entity = spawner.global_entity_to_entity(&global_entity).unwrap();
372
373                        self.process_insert(
374                            world,
375                            local_entity_map,
376                            &remote_entity,
377                            &world_entity,
378                            component,
379                            &component_kind,
380                        );
381                    } else {
382                        // entity may have despawned on disconnect or something similar?
383                        warn!("received InsertComponent message for nonexistant entity");
384                    }
385                }
386                EntityMessage::RemoveComponent(remote_entity, component_kind) => {
387                    let global_entity = local_entity_map
388                        .global_entity_from_remote(&remote_entity)
389                        .unwrap();
390                    let world_entity = spawner.global_entity_to_entity(global_entity).unwrap();
391                    self.process_remove(
392                        world,
393                        local_entity_map,
394                        &remote_entity,
395                        &world_entity,
396                        &component_kind,
397                    );
398                }
399                EntityMessage::Noop => {
400                    // do nothing
401                }
402                EntityMessage::SetAuthority(_, remote_entity, auth_status) => {
403                    // Update the stored auth status so get_entity_auth_status() reflects the new value
404                    self.remote_engine.receive_set_auth_status(remote_entity, auth_status);
405                    let Some(global_entity) = local_entity_map.global_entity_from_remote(&remote_entity) else {
406                        continue;
407                    };
408                    self.incoming_events.push(EntityEvent::SetAuthority(*global_entity, auth_status));
409                }
410                msg => {
411                    // let msg_type = msg.get_type();
412                    let event = msg.to_event(local_entity_map);
413                    self.incoming_events.push(event);
414                }
415            }
416        }
417    }
418
419    fn process_insert<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
420        &mut self,
421        world: &mut W,
422        converter: &dyn LocalEntityAndGlobalEntityConverter,
423        entity: &RemoteEntity,
424        world_entity: &E,
425        component: Box<dyn Replicate>,
426        component_kind: &ComponentKind,
427    ) {
428        if let Some(remote_entity_set) = component.relations_waiting() {
429
430            self.waitlist.waitlist_queue_entity(
431                &self.remote_engine,
432                entity,
433                component,
434                component_kind,
435                &remote_entity_set,
436            );
437        } else {
438            self.finish_insert(
439                world,
440                converter,
441                entity,
442                world_entity,
443                component,
444                component_kind,
445            );
446        }
447    }
448
449    fn finish_insert<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
450        &mut self,
451        world: &mut W,
452        converter: &dyn LocalEntityAndGlobalEntityConverter,
453        entity: &RemoteEntity,
454        world_entity: &E,
455        component: Box<dyn Replicate>,
456        component_kind: &ComponentKind,
457    ) {
458        // let name = component.name();
459        // info!(
460        //     "Remote World Manager: finish inserting component {:?} for entity {:?}",
461        //     &name, global_entity
462        // );
463
464        world.insert_boxed_component(world_entity, component);
465
466        let global_entity = converter.remote_entity_to_global_entity(entity).unwrap();
467
468        self.incoming_events
469            .push(EntityEvent::InsertComponent(global_entity, *component_kind));
470    }
471
472    fn process_remove<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
473        &mut self,
474        world: &mut W,
475        converter: &dyn LocalEntityAndGlobalEntityConverter,
476        entity: &RemoteEntity,
477        world_entity: &E,
478        component_kind: &ComponentKind,
479    ) {
480        if self.waitlist.process_remove(entity, component_kind) {
481            return;
482        }
483        // Remove from world
484        if let Some(component) = world.remove_component_of_kind(world_entity, component_kind) {
485            // Send out event
486            if let Ok(global_entity) = converter.remote_entity_to_global_entity(entity) {
487                self.incoming_events
488                    .push(EntityEvent::RemoveComponent(global_entity, component));
489            }
490        }
491    }
492
493    fn process_waitlist_messages<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
494        &mut self,
495        local_converter: &dyn LocalEntityAndGlobalEntityConverter,
496        world_converter: &dyn EntityAndGlobalEntityConverter<E>,
497        world: &mut W,
498        now: &Instant,
499    ) {
500        for (entity, component_kind, component) in
501            self.waitlist.entities_to_insert(now, local_converter)
502        {
503            let global_entity = local_converter
504                .remote_entity_to_global_entity(&entity)
505                .unwrap();
506            let world_entity = world_converter
507                .global_entity_to_entity(&global_entity)
508                .unwrap();
509            self.finish_insert(
510                world,
511                local_converter,
512                &entity,
513                &world_entity,
514                component,
515                &component_kind,
516            );
517        }
518    }
519
520    fn process_updates<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
521        &mut self,
522        local_converter: &dyn LocalEntityAndGlobalEntityConverter,
523        world_converter: &dyn EntityAndGlobalEntityConverter<E>,
524        component_kinds: &ComponentKinds,
525        world: &mut W,
526        now: &Instant,
527        incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
528    ) {
529        self.process_ready_updates(
530            local_converter,
531            world_converter,
532            component_kinds,
533            world,
534            incoming_updates,
535        );
536        self.process_waitlist_updates(local_converter, world_converter, world, now);
537    }
538
539    /// Process component updates from raw bits for a given entity
540    fn process_ready_updates<WE: Copy + Eq + Hash + Send + Sync, W: WorldMutType<WE>>(
541        &mut self,
542        local_converter: &dyn LocalEntityAndGlobalEntityConverter,
543        world_converter: &dyn EntityAndGlobalEntityConverter<WE>,
544        component_kinds: &ComponentKinds,
545        world: &mut W,
546        incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
547    ) {
548        for (tick, local_entity, component_kind) in self.waitlist.process_ready_updates(
549            &self.remote_engine,
550            local_converter,
551            world_converter,
552            component_kinds,
553            world,
554            incoming_updates,
555        ) {
556            let global_entity = local_converter
557                .owned_entity_to_global_entity(&local_entity)
558                .unwrap();
559            self.incoming_events.push(EntityEvent::UpdateComponent(
560                tick,
561                global_entity,
562                component_kind,
563            ));
564        }
565    }
566
567    fn process_waitlist_updates<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
568        &mut self,
569        local_converter: &dyn LocalEntityAndGlobalEntityConverter,
570        world_converter: &dyn EntityAndGlobalEntityConverter<E>,
571        world: &mut W,
572        now: &Instant,
573    ) {
574        for (tick, remote_entity, component_kind) in
575            self.waitlist
576                .process_waitlist_updates(local_converter, world_converter, world, now)
577        {
578            let global_entity = local_converter
579                .remote_entity_to_global_entity(&remote_entity)
580                .unwrap();
581            self.incoming_events.push(EntityEvent::UpdateComponent(
582                tick,
583                global_entity,
584                component_kind,
585            ));
586        }
587    }
588
589    pub(crate) fn force_drain_entity_buffers(&mut self, remote_entity: &RemoteEntity) {
590        let Some(channel) = self.remote_engine.get_world_mut().get_mut(remote_entity) else {
591            panic!("Cannot force-drain non-existent entity");
592        };
593        channel.force_drain_all_buffers();
594    }
595
596    pub(crate) fn extract_component_kinds(
597        &self,
598        remote_entity: &RemoteEntity,
599    ) -> HashSet<ComponentKind> {
600        let Some(channel) = self.remote_engine.get_world().get(remote_entity) else {
601            panic!("Cannot extract component kinds from non-existent entity");
602        };
603        channel.extract_inserted_component_kinds()
604    }
605
606    pub(crate) fn remove_entity_channel(
607        &mut self,
608        remote_entity: &RemoteEntity,
609    ) -> RemoteEntityChannel {
610        self.remote_engine.remove_entity_channel(remote_entity)
611    }
612
613    pub(crate) fn insert_entity_channel(
614        &mut self,
615        remote_entity: RemoteEntity,
616        channel: RemoteEntityChannel,
617    ) {
618        self.remote_engine
619            .insert_entity_channel(remote_entity, channel);
620    }
621
622    pub(crate) fn has_entity_channel(&self, remote_entity: &RemoteEntity) -> bool {
623        self.remote_engine.has_entity(remote_entity)
624    }
625
626    pub(crate) fn get_entity_channel_mut(
627        &mut self,
628        remote_entity: &RemoteEntity,
629    ) -> Option<&mut RemoteEntityChannel> {
630        self.remote_engine.get_entity_channel_mut(remote_entity)
631    }
632
633    /// Returns the current authority status for `entity`'s remote channel, if one exists.
634    pub fn get_entity_auth_status(&self, entity: &RemoteEntity) -> Option<EntityAuthStatus> {
635        self.remote_engine.get_entity_auth_status(entity)
636    }
637}
638
639impl InScopeEntities<RemoteEntity> for RemoteWorldManager {
640    fn has_entity(&self, entity: &RemoteEntity) -> bool {
641        self.remote_engine.has_entity(entity)
642    }
643}