naia_shared/world/remote/
remote_world_manager.rs

1use std::{
2    collections::{HashMap, HashSet},
3    hash::Hash,
4};
5
6use log::{info, warn};
7use naia_socket_shared::Instant;
8
9use crate::{
10    world::{
11        entity::local_entity::RemoteEntity,
12        local_world_manager::LocalWorldManager,
13        remote::{
14            entity_event::EntityEvent,
15            entity_waitlist::{EntityWaitlist, WaitlistHandle, WaitlistStore},
16            remote_world_reader::RemoteWorldEvents,
17        },
18    },
19    ComponentFieldUpdate, ComponentKind, ComponentKinds, ComponentUpdate, EntityAction,
20    EntityConverter, GlobalWorldManagerType, Replicate, Tick, WorldMutType,
21};
22
23pub struct RemoteWorldManager<E: Copy + Eq + Hash + Send + Sync> {
24    pub entity_waitlist: EntityWaitlist,
25    insert_waitlist_store: WaitlistStore<(E, Box<dyn Replicate>)>,
26    insert_waitlist_map: HashMap<(E, ComponentKind), WaitlistHandle>,
27    update_waitlist_store: WaitlistStore<(Tick, E, ComponentKind, ComponentFieldUpdate)>,
28    update_waitlist_map: HashMap<(E, ComponentKind), HashMap<u8, WaitlistHandle>>,
29    outgoing_events: Vec<EntityEvent<E>>,
30}
31
32impl<E: Copy + Eq + Hash + Send + Sync> RemoteWorldManager<E> {
33    pub fn new() -> Self {
34        Self {
35            entity_waitlist: EntityWaitlist::new(),
36            insert_waitlist_store: WaitlistStore::new(),
37            insert_waitlist_map: HashMap::new(),
38            update_waitlist_store: WaitlistStore::new(),
39            update_waitlist_map: HashMap::new(),
40            outgoing_events: Vec::new(),
41        }
42    }
43
44    pub fn on_entity_channel_opened(&mut self, remote_entity: &RemoteEntity) {
45        self.entity_waitlist.add_entity(remote_entity);
46    }
47
48    pub fn on_entity_channel_closing(&mut self, remote_entity: &RemoteEntity) {
49        self.entity_waitlist.remove_entity(remote_entity);
50    }
51
52    pub fn process_world_events<W: WorldMutType<E>>(
53        &mut self,
54        global_world_manager: &dyn GlobalWorldManagerType<E>,
55        local_world_manager: &mut LocalWorldManager<E>,
56        component_kinds: &ComponentKinds,
57        world: &mut W,
58        now: &Instant,
59        world_events: RemoteWorldEvents<E>,
60    ) -> Vec<EntityEvent<E>> {
61        self.process_updates(
62            global_world_manager,
63            local_world_manager,
64            component_kinds,
65            world,
66            now,
67            world_events.incoming_updates,
68        );
69        self.process_actions(
70            global_world_manager,
71            local_world_manager,
72            world,
73            now,
74            world_events.incoming_actions,
75            world_events.incoming_components,
76        );
77
78        std::mem::take(&mut self.outgoing_events)
79    }
80
81    /// Process incoming Entity actions.
82    ///
83    /// * Emits client events corresponding to any [`EntityAction`] received
84    /// Store
85    pub fn process_actions<W: WorldMutType<E>>(
86        &mut self,
87        global_world_manager: &dyn GlobalWorldManagerType<E>,
88        local_world_manager: &mut LocalWorldManager<E>,
89        world: &mut W,
90        now: &Instant,
91        incoming_actions: Vec<EntityAction<RemoteEntity>>,
92        incoming_components: HashMap<(RemoteEntity, ComponentKind), Box<dyn Replicate>>,
93    ) {
94        self.process_ready_actions(
95            global_world_manager,
96            local_world_manager,
97            world,
98            incoming_actions,
99            incoming_components,
100        );
101        self.process_waitlist_actions(global_world_manager, local_world_manager, world, now);
102    }
103
104    /// For each [`EntityAction`] that can be executed now,
105    /// execute it and emit a corresponding event.
106    fn process_ready_actions<W: WorldMutType<E>>(
107        &mut self,
108        global_world_manager: &dyn GlobalWorldManagerType<E>,
109        local_world_manager: &mut LocalWorldManager<E>,
110        world: &mut W,
111        incoming_actions: Vec<EntityAction<RemoteEntity>>,
112        mut incoming_components: HashMap<(RemoteEntity, ComponentKind), Box<dyn Replicate>>,
113    ) {
114        // execute the action and emit an event
115        for action in incoming_actions {
116            match action {
117                EntityAction::SpawnEntity(remote_entity, components) => {
118                    // set up entity
119                    let world_entity = world.spawn_entity();
120                    local_world_manager.insert_remote_entity(&world_entity, remote_entity);
121
122                    self.outgoing_events
123                        .push(EntityEvent::<E>::SpawnEntity(world_entity));
124
125                    // read component list
126                    for component_kind in components {
127                        let component = incoming_components
128                            .remove(&(remote_entity, component_kind))
129                            .unwrap();
130
131                        self.process_insert(world, world_entity, component, &component_kind);
132                    }
133                }
134                EntityAction::DespawnEntity(remote_entity) => {
135                    let world_entity = local_world_manager.remove_by_remote_entity(&remote_entity);
136
137                    // Generate event for each component, handing references off just in
138                    // case
139                    if let Some(component_kinds) =
140                        global_world_manager.component_kinds(&world_entity)
141                    {
142                        for component_kind in component_kinds {
143                            self.process_remove(world, world_entity, component_kind);
144                        }
145                    }
146
147                    world.despawn_entity(&world_entity);
148
149                    self.on_entity_channel_closing(&remote_entity);
150
151                    self.outgoing_events
152                        .push(EntityEvent::<E>::DespawnEntity(world_entity));
153                }
154                EntityAction::InsertComponent(remote_entity, component_kind) => {
155                    let component = incoming_components
156                        .remove(&(remote_entity, component_kind))
157                        .unwrap();
158
159                    if local_world_manager.has_remote_entity(&remote_entity) {
160                        let world_entity =
161                            local_world_manager.world_entity_from_remote(&remote_entity);
162
163                        self.process_insert(world, world_entity, component, &component_kind);
164                    } else {
165                        // entity may have despawned on disconnect or something similar?
166                        warn!("received InsertComponent message for nonexistant entity");
167                    }
168                }
169                EntityAction::RemoveComponent(remote_entity, component_kind) => {
170                    let world_entity = local_world_manager.world_entity_from_remote(&remote_entity);
171                    self.process_remove(world, world_entity, component_kind);
172                }
173                EntityAction::Noop => {
174                    // do nothing
175                }
176            }
177        }
178    }
179
180    fn process_insert<W: WorldMutType<E>>(
181        &mut self,
182        world: &mut W,
183        world_entity: E,
184        component: Box<dyn Replicate>,
185        component_kind: &ComponentKind,
186    ) {
187        if let Some(entity_set) = component.relations_waiting() {
188            let handle = self.entity_waitlist.queue(
189                &entity_set,
190                &mut self.insert_waitlist_store,
191                (world_entity, component),
192            );
193            self.insert_waitlist_map
194                .insert((world_entity, *component_kind), handle);
195        } else {
196            self.finish_insert(world, world_entity, component, component_kind);
197        }
198    }
199
200    fn finish_insert<W: WorldMutType<E>>(
201        &mut self,
202        world: &mut W,
203        world_entity: E,
204        component: Box<dyn Replicate>,
205        component_kind: &ComponentKind,
206    ) {
207        world.insert_boxed_component(&world_entity, component);
208
209        self.outgoing_events.push(EntityEvent::<E>::InsertComponent(
210            world_entity,
211            *component_kind,
212        ));
213    }
214
215    fn process_remove<W: WorldMutType<E>>(
216        &mut self,
217        world: &mut W,
218        world_entity: E,
219        component_kind: ComponentKind,
220    ) {
221        // Remove from insert waitlist if it's there
222        if let Some(handle) = self
223            .insert_waitlist_map
224            .remove(&(world_entity, component_kind))
225        {
226            self.insert_waitlist_store.remove(&handle);
227            self.entity_waitlist.remove_waiting_handle(&handle);
228            return;
229        }
230        // Remove Component from update waitlist if it's there
231        if let Some(handle_map) = self
232            .update_waitlist_map
233            .remove(&(world_entity, component_kind))
234        {
235            for (_index, handle) in handle_map {
236                self.update_waitlist_store.remove(&handle);
237                self.entity_waitlist.remove_waiting_handle(&handle);
238            }
239            return;
240        }
241        // Remove from world
242        if let Some(component) = world.remove_component_of_kind(&world_entity, &component_kind) {
243            // Send out event
244            self.outgoing_events
245                .push(EntityEvent::<E>::RemoveComponent(world_entity, component));
246        }
247    }
248
249    fn process_waitlist_actions<W: WorldMutType<E>>(
250        &mut self,
251        global_world_manager: &dyn GlobalWorldManagerType<E>,
252        local_world_manager: &mut LocalWorldManager<E>,
253        world: &mut W,
254        now: &Instant,
255    ) {
256        if let Some(list) = self
257            .entity_waitlist
258            .collect_ready_items(now, &mut self.insert_waitlist_store)
259        {
260            let converter = EntityConverter::new(
261                global_world_manager.to_global_entity_converter(),
262                local_world_manager,
263            );
264
265            for (world_entity, mut component) in list {
266                let component_kind = component.kind();
267
268                self.insert_waitlist_map
269                    .remove(&(world_entity, component_kind));
270
271                {
272                    component.relations_complete(&converter);
273                }
274
275                self.finish_insert(world, world_entity, component, &component_kind);
276            }
277        }
278    }
279
280    /// Process incoming Entity updates.
281    ///
282    /// * Emits client events corresponding to any [`EntityAction`] received
283    /// Store
284    pub fn process_updates<W: WorldMutType<E>>(
285        &mut self,
286        global_world_manager: &dyn GlobalWorldManagerType<E>,
287        local_world_manager: &mut LocalWorldManager<E>,
288        component_kinds: &ComponentKinds,
289        world: &mut W,
290        now: &Instant,
291        incoming_updates: Vec<(Tick, E, ComponentUpdate)>,
292    ) {
293        self.process_ready_updates(
294            global_world_manager,
295            local_world_manager,
296            component_kinds,
297            world,
298            incoming_updates,
299        );
300        self.process_waitlist_updates(global_world_manager, local_world_manager, world, now);
301    }
302
303    /// Process component updates from raw bits for a given entity
304    fn process_ready_updates<W: WorldMutType<E>>(
305        &mut self,
306        global_world_manager: &dyn GlobalWorldManagerType<E>,
307        local_world_manager: &LocalWorldManager<E>,
308        component_kinds: &ComponentKinds,
309        world: &mut W,
310        mut incoming_updates: Vec<(Tick, E, ComponentUpdate)>,
311    ) {
312        let converter = EntityConverter::new(
313            global_world_manager.to_global_entity_converter(),
314            local_world_manager,
315        );
316        for (tick, world_entity, component_update) in incoming_updates.drain(..) {
317            let component_kind = component_update.kind;
318
319            // split the component_update into the waiting and ready parts
320            let Ok((waiting_updates_opt, ready_update_opt)) =
321                component_update.split_into_waiting_and_ready(&converter, component_kinds)
322            else {
323                warn!("Remote World Manager: cannot read malformed component update message");
324                continue;
325            };
326
327            if waiting_updates_opt.is_some() && ready_update_opt.is_some() {
328                warn!("Incoming Update split into BOTH waiting and ready parts");
329            }
330            if waiting_updates_opt.is_some() && ready_update_opt.is_none() {
331                warn!("Incoming Update split into ONLY waiting part");
332            }
333            if waiting_updates_opt.is_none() && ready_update_opt.is_some() {
334                // warn!("Incoming Update split into ONLY ready part");
335            }
336            if waiting_updates_opt.is_none() && ready_update_opt.is_none() {
337                panic!("Incoming Update split into NEITHER waiting nor ready parts. This should not happen.");
338            }
339
340            // if it exists, queue the waiting part of the component update
341            if let Some(waiting_updates) = waiting_updates_opt {
342                for (waiting_entity, waiting_field_update) in waiting_updates {
343                    let field_id = waiting_field_update.field_id();
344
345                    // Have to convert the single waiting entity to a HashSet ..
346                    // TODO: make this more efficient
347                    let mut waiting_entities = HashSet::new();
348                    waiting_entities.insert(waiting_entity);
349
350                    let handle = self.entity_waitlist.queue(
351                        &waiting_entities,
352                        &mut self.update_waitlist_store,
353                        (tick, world_entity, component_kind, waiting_field_update),
354                    );
355                    let component_field_key = (world_entity, component_kind);
356                    if !self.update_waitlist_map.contains_key(&component_field_key) {
357                        self.update_waitlist_map
358                            .insert(component_field_key, HashMap::new());
359                    }
360                    let handle_map = self
361                        .update_waitlist_map
362                        .get_mut(&component_field_key)
363                        .unwrap();
364                    if let Some(old_handle) = handle_map.get(&field_id) {
365                        self.update_waitlist_store.remove(&handle);
366                        self.entity_waitlist.remove_waiting_handle(old_handle);
367                    }
368                    handle_map.insert(field_id, handle);
369                }
370            }
371            // if it exists, apply the ready part of the component update
372            if let Some(ready_update) = ready_update_opt {
373                if world
374                    .component_apply_update(
375                        &converter,
376                        &world_entity,
377                        &component_kind,
378                        ready_update,
379                    )
380                    .is_err()
381                {
382                    warn!("Remote World Manager: cannot read malformed component update message");
383                    continue;
384                }
385
386                self.outgoing_events.push(EntityEvent::UpdateComponent(
387                    tick,
388                    world_entity,
389                    component_kind,
390                ));
391            }
392        }
393    }
394
395    fn process_waitlist_updates<W: WorldMutType<E>>(
396        &mut self,
397        global_world_manager: &dyn GlobalWorldManagerType<E>,
398        local_world_manager: &LocalWorldManager<E>,
399        world: &mut W,
400        now: &Instant,
401    ) {
402        let converter = EntityConverter::new(
403            global_world_manager.to_global_entity_converter(),
404            local_world_manager,
405        );
406        if let Some(list) = self
407            .entity_waitlist
408            .collect_ready_items(now, &mut self.update_waitlist_store)
409        {
410            for (tick, world_entity, component_kind, ready_update) in list {
411                info!("processing waiting update!");
412
413                let component_key = (world_entity, component_kind);
414                let mut remove_entry = false;
415                if let Some(component_map) = self.update_waitlist_map.get_mut(&component_key) {
416                    component_map.remove(&ready_update.field_id());
417                    if component_map.is_empty() {
418                        remove_entry = true;
419                    }
420                }
421                if remove_entry {
422                    self.update_waitlist_map.remove(&component_key);
423                }
424
425                if world
426                    .component_apply_field_update(
427                        &converter,
428                        &world_entity,
429                        &component_kind,
430                        ready_update,
431                    )
432                    .is_err()
433                {
434                    warn!("Remote World Manager: cannot read malformed complete waitlisted component update message");
435                    continue;
436                }
437
438                self.outgoing_events.push(EntityEvent::<E>::UpdateComponent(
439                    tick,
440                    world_entity,
441                    component_kind,
442                ));
443            }
444        }
445    }
446}