Skip to main content

naia_shared/world/
world_reader.rs

1use crate::world::local::local_entity::RemoteEntity;
2use crate::world::local::local_world_manager::LocalWorldManager;
3use crate::{
4    messages::channels::receivers::indexed_message_reader::IndexedMessageReader,
5    world::host::host_world_manager::SubCommandId, BitReader, ComponentKind, ComponentKinds,
6    EntityAuthStatus, EntityMessage, EntityMessageType, HostEntity, MessageIndex, OwnedLocalEntity,
7    Serde, SerdeErr, Tick,
8};
9
10/// Stateless helper that deserializes entity update and message payloads from a bit stream into a [`LocalWorldManager`].
11pub struct WorldReader;
12
13impl WorldReader {
14    // Reading
15
16    fn read_message_index(
17        reader: &mut BitReader,
18        last_index_opt: &mut Option<MessageIndex>,
19    ) -> Result<MessageIndex, SerdeErr> {
20        // read index
21        let current_index = IndexedMessageReader::read_message_index(reader, last_index_opt)?;
22
23        *last_index_opt = Some(current_index);
24
25        Ok(current_index)
26    }
27
28    /// Deserializes both component updates and entity messages from `reader` into `world_manager`.
29    pub fn read_world_events(
30        world_manager: &mut LocalWorldManager,
31        component_kinds: &ComponentKinds,
32        tick: &Tick,
33        reader: &mut BitReader,
34    ) -> Result<(), SerdeErr> {
35        // read entity updates
36        Self::read_updates(world_manager, component_kinds, tick, reader)?;
37
38        // read entity messages
39        Self::read_messages(world_manager, component_kinds, reader)?;
40
41        Ok(())
42    }
43
44    /// Read incoming Entity messages.
45    fn read_messages(
46        world_manager: &mut LocalWorldManager,
47        component_kinds: &ComponentKinds,
48        reader: &mut BitReader,
49    ) -> Result<(), SerdeErr> {
50        let mut last_read_id: Option<MessageIndex> = None;
51
52        loop {
53            // read message continue bit
54            let message_continue = bool::de(reader)?;
55            if !message_continue {
56                break;
57            }
58
59            Self::read_message(world_manager, component_kinds, reader, &mut last_read_id)?;
60        }
61
62        Ok(())
63    }
64
65    /// Read the bits corresponding to the EntityMessage and adds the [`EntityMessage`]
66    /// to an internal buffer.
67    ///
68    /// We can use a UnorderedReliableReceiver buffer because the messages have already been
69    /// ordered by the client's jitter buffer
70    fn read_message(
71        world_manager: &mut LocalWorldManager,
72        component_kinds: &ComponentKinds,
73        reader: &mut BitReader,
74        last_read_id: &mut Option<MessageIndex>,
75    ) -> Result<(), SerdeErr> {
76        let message_id = Self::read_message_index(reader, last_read_id)?;
77
78        let message_type = EntityMessageType::de(reader)?;
79
80        match message_type {
81            EntityMessageType::Spawn => {
82                // Count when Spawn message KIND is recognized on wire (before routing)
83                #[cfg(feature = "e2e_debug")]
84                {
85                    extern "Rust" {
86                        fn client_saw_spawn_increment();
87                    }
88                    // Safety: client_saw_spawn_increment is defined by the naia-tests harness
89                    // when compiled with feature = "e2e_debug". It atomically increments a
90                    // counter and has no preconditions. The e2e_debug feature is never enabled
91                    // in production builds.
92                    unsafe {
93                        client_saw_spawn_increment();
94                    }
95                }
96
97                // read remote entity
98                let remote_entity = RemoteEntity::de(reader)?;
99
100                world_manager.receiver_buffer_message(
101                    message_id,
102                    EntityMessage::Spawn(remote_entity.copy_to_owned()),
103                );
104            }
105            EntityMessageType::SpawnWithComponents => {
106                // read entity as full OwnedLocalEntity (carries is_static) then reverse to Remote
107                let local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
108
109                // read component count
110                let count = u8::de(reader)?;
111
112                let mut kinds = Vec::with_capacity(count as usize);
113                for _ in 0..count {
114                    let converter = world_manager.entity_converter();
115                    let new_component = component_kinds.read(reader, converter)?;
116                    let new_component_kind = new_component.kind();
117                    world_manager.insert_received_component(
118                        &local_entity,
119                        &new_component_kind,
120                        new_component,
121                    );
122                    kinds.push(new_component_kind);
123                }
124
125                world_manager.receiver_buffer_message(
126                    message_id,
127                    EntityMessage::SpawnWithComponents(local_entity, kinds),
128                );
129            }
130            EntityMessageType::Despawn => {
131                // read local entity
132                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
133                // apply redirect if entity was migrated
134                local_entity = world_manager.apply_entity_redirect(local_entity);
135
136                world_manager
137                    .receiver_buffer_message(message_id, EntityMessage::Despawn(local_entity));
138            }
139            EntityMessageType::InsertComponent => {
140                // read local entity
141                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
142                // apply redirect if entity was migrated
143                local_entity = world_manager.apply_entity_redirect(local_entity);
144
145                // read component
146                let converter = world_manager.entity_converter();
147                let new_component = component_kinds.read(reader, converter)?;
148                let new_component_kind = new_component.kind();
149
150                world_manager.receiver_buffer_message(
151                    message_id,
152                    EntityMessage::InsertComponent(local_entity, new_component_kind),
153                );
154                world_manager.insert_received_component(
155                    &local_entity,
156                    &new_component_kind,
157                    new_component,
158                );
159            }
160            EntityMessageType::RemoveComponent => {
161                // read local entity
162                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
163                // apply redirect if entity was migrated
164                local_entity = world_manager.apply_entity_redirect(local_entity);
165
166                // read component kind
167                let component_kind = ComponentKind::de(component_kinds, reader)?;
168
169                world_manager.receiver_buffer_message(
170                    message_id,
171                    EntityMessage::RemoveComponent(local_entity, component_kind),
172                );
173            }
174            EntityMessageType::Publish => {
175                // read subcommand id
176                let sub_command_id = SubCommandId::de(reader)?;
177
178                // read local entity
179                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
180                // apply redirect if entity was migrated
181                local_entity = world_manager.apply_entity_redirect(local_entity);
182
183                world_manager.receiver_buffer_message(
184                    message_id,
185                    EntityMessage::Publish(sub_command_id, local_entity),
186                );
187            }
188            EntityMessageType::Unpublish => {
189                // read subcommand id
190                let sub_command_id = SubCommandId::de(reader)?;
191
192                // read local entity
193                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
194                // apply redirect if entity was migrated
195                local_entity = world_manager.apply_entity_redirect(local_entity);
196
197                world_manager.receiver_buffer_message(
198                    message_id,
199                    EntityMessage::Unpublish(sub_command_id, local_entity),
200                );
201            }
202            EntityMessageType::EnableDelegation => {
203                // read subcommand id
204                let sub_command_id = SubCommandId::de(reader)?;
205
206                // read local entity
207                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
208                // apply redirect if entity was migrated
209                local_entity = world_manager.apply_entity_redirect(local_entity);
210
211                world_manager.receiver_buffer_message(
212                    message_id,
213                    EntityMessage::EnableDelegation(sub_command_id, local_entity),
214                );
215            }
216            EntityMessageType::DisableDelegation => {
217                // this command is only ever received by clients, regarding server-owned entities
218
219                // read subcommand id
220                let sub_command_id = SubCommandId::de(reader)?;
221
222                // read remote entity
223                let remote_entity = RemoteEntity::de(reader)?;
224
225                world_manager.receiver_buffer_message(
226                    message_id,
227                    EntityMessage::DisableDelegation(sub_command_id, remote_entity.copy_to_owned()),
228                );
229            }
230            EntityMessageType::SetAuthority => {
231                // this command is only ever received by clients, regarding server-owned entities
232
233                // Count when SetAuthority message KIND is recognized on wire (before entity mapping)
234                #[cfg(feature = "e2e_debug")]
235                {
236                    extern "Rust" {
237                        fn client_saw_set_auth_wire_increment();
238                    }
239                    // Safety: same as client_saw_spawn_increment above — atomic counter defined
240                    // by the test harness; no preconditions; e2e_debug is never active in prod.
241                    unsafe {
242                        client_saw_set_auth_wire_increment();
243                    }
244                }
245
246                // read subcommand id
247                let sub_command_id = SubCommandId::de(reader)?;
248
249                // read remote entity
250                let remote_entity = RemoteEntity::de(reader)?;
251
252                // read auth status
253                let auth_status = EntityAuthStatus::de(reader)?;
254
255                world_manager.receiver_buffer_message(
256                    message_id,
257                    EntityMessage::SetAuthority(
258                        sub_command_id,
259                        remote_entity.copy_to_owned(),
260                        auth_status,
261                    ),
262                );
263            }
264
265            // below are response-type messages
266            EntityMessageType::RequestAuthority => {
267                // this command is only read by the server, regarding server-owned entities
268
269                // read subcommand id
270                let sub_command_id = SubCommandId::de(reader)?;
271
272                // read host entity
273                let host_entity = HostEntity::de(reader)?;
274
275                world_manager.receiver_buffer_message(
276                    message_id,
277                    EntityMessage::RequestAuthority(sub_command_id, host_entity.copy_to_owned()),
278                );
279            }
280            EntityMessageType::ReleaseAuthority => {
281                // this command is only read by the server, regarding server-owned entities
282
283                // read subcommand id
284                let sub_command_id = SubCommandId::de(reader)?;
285
286                // read local entity
287                let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
288                // apply redirect if entity was migrated
289                local_entity = world_manager.apply_entity_redirect(local_entity);
290
291                world_manager.receiver_buffer_message(
292                    message_id,
293                    EntityMessage::ReleaseAuthority(sub_command_id, local_entity),
294                );
295            }
296            EntityMessageType::EnableDelegationResponse => {
297                // this command is only read by the server, regarding server-owned entities
298
299                // read subcommand id
300                let sub_command_id = SubCommandId::de(reader)?;
301
302                // read host entity
303                let host_entity = HostEntity::de(reader)?;
304
305                world_manager.receiver_buffer_message(
306                    message_id,
307                    EntityMessage::EnableDelegationResponse(
308                        sub_command_id,
309                        host_entity.copy_to_owned(),
310                    ),
311                );
312            }
313            EntityMessageType::MigrateResponse => {
314                // this command is only ever received by clients, regarding newly delegated server-owned entities
315
316                // read subcommand id
317                let sub_command_id = SubCommandId::de(reader)?;
318
319                // read client's HostEntity (so client can look it up in entity_map!)
320                let client_host_entity = HostEntity::de(reader)?;
321
322                // read new RemoteEntity (what the client will create)
323                let new_remote_entity = RemoteEntity::de(reader)?;
324
325                world_manager.receiver_buffer_message(
326                    message_id,
327                    EntityMessage::MigrateResponse(
328                        sub_command_id,
329                        client_host_entity.copy_to_owned(),
330                        new_remote_entity,
331                    ),
332                );
333            }
334            EntityMessageType::Noop => {
335                world_manager.receiver_buffer_message(message_id, EntityMessage::Noop);
336            }
337        }
338
339        Ok(())
340    }
341
342    /// Read component updates from raw bits
343    fn read_updates(
344        world_manager: &mut LocalWorldManager,
345        component_kinds: &ComponentKinds,
346        tick: &Tick,
347        reader: &mut BitReader,
348    ) -> Result<(), SerdeErr> {
349        let mut _update_count = 0;
350        loop {
351            // read update continue bit
352            let update_continue = bool::de(reader)?;
353            if !update_continue {
354                break;
355            }
356            _update_count += 1;
357
358            let mut local_entity = OwnedLocalEntity::de(reader)?.to_reversed();
359            // apply redirect if entity was migrated
360            local_entity = world_manager.apply_entity_redirect(local_entity);
361
362            Self::read_update(world_manager, component_kinds, tick, reader, &local_entity)?;
363        }
364
365        Ok(())
366    }
367
368    /// Read component updates from raw bits for a given entity
369    fn read_update(
370        world_manager: &mut LocalWorldManager,
371        component_kinds: &ComponentKinds,
372        tick: &Tick,
373        reader: &mut BitReader,
374        local_entity: &OwnedLocalEntity,
375    ) -> Result<(), SerdeErr> {
376        loop {
377            // read update continue bit
378            let component_continue = bool::de(reader)?;
379            if !component_continue {
380                break;
381            }
382
383            let component_update = component_kinds.read_create_update(reader)?;
384
385            // At this point, the WorldChannel/EntityReceiver should guarantee the Entity is in scope, correct?
386            if world_manager.has_local_entity(local_entity) {
387                world_manager.insert_received_update(*tick, local_entity, component_update);
388            }
389        }
390
391        Ok(())
392    }
393}