Skip to main content

naia_shared/world/local/
local_world_manager.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    hash::Hash,
4    net::SocketAddr,
5    time::Duration,
6};
7
8/// Fine-grained timing inside `take_outgoing_events` — see Phase 4 diagnostic
9/// `examples/phase4_tick_internals.rs`. Counters are cumulative across all
10/// user connections within a tick; reset/snapshot from the bench harness.
11#[cfg(feature = "bench_instrumentation")]
12pub mod bench_take_events_counters {
13    use std::sync::atomic::{AtomicU64, Ordering};
14    #[doc(hidden)] pub static NS_HOST_REMOTE_COMMANDS: AtomicU64 = AtomicU64::new(0);
15    #[doc(hidden)] pub static NS_SENDER_COLLECT: AtomicU64 = AtomicU64::new(0);
16    #[doc(hidden)] pub static NS_TAKE_UPDATE_EVENTS: AtomicU64 = AtomicU64::new(0);
17
18    /// Resets all counters to zero.
19    pub fn reset() {
20        NS_HOST_REMOTE_COMMANDS.store(0, Ordering::Relaxed);
21        NS_SENDER_COLLECT.store(0, Ordering::Relaxed);
22        NS_TAKE_UPDATE_EVENTS.store(0, Ordering::Relaxed);
23    }
24    /// Returns a snapshot of all counters as a tuple.
25    pub fn snapshot() -> (u64, u64, u64) {
26        (
27            NS_HOST_REMOTE_COMMANDS.load(Ordering::Relaxed),
28            NS_SENDER_COLLECT.load(Ordering::Relaxed),
29            NS_TAKE_UPDATE_EVENTS.load(Ordering::Relaxed),
30        )
31    }
32}
33
34/// Phase 6 — per-`EntityMessageType` emission counters. Incremented exactly
35/// when a command is genuinely committed to a packet (the `is_writing == true`
36/// path in `WorldWriter::write_command`, via `record_command_written`). Sized
37/// to answer the audit question: "PaintRect of N tiles → does the wire carry
38/// N `SpawnWithComponents`, or does it degenerate to N `Spawn` + N×K
39/// `InsertComponent`?" Enabled via `bench_instrumentation`.
40#[cfg(feature = "bench_instrumentation")]
41pub mod cmd_emission_counters {
42    use std::sync::atomic::{AtomicU64, Ordering};
43
44    #[doc(hidden)] pub static SPAWN: AtomicU64 = AtomicU64::new(0);
45    #[doc(hidden)] pub static SPAWN_WITH_COMPONENTS: AtomicU64 = AtomicU64::new(0);
46    #[doc(hidden)] pub static DESPAWN: AtomicU64 = AtomicU64::new(0);
47    #[doc(hidden)] pub static INSERT_COMPONENT: AtomicU64 = AtomicU64::new(0);
48    #[doc(hidden)] pub static REMOVE_COMPONENT: AtomicU64 = AtomicU64::new(0);
49    #[doc(hidden)] pub static NOOP: AtomicU64 = AtomicU64::new(0);
50    #[doc(hidden)] pub static OTHER: AtomicU64 = AtomicU64::new(0);
51
52    /// `SpawnWithComponents` payload total — sum of `Vec<ComponentKind>.len()`
53    /// across emissions. Cross-checks N×K accounting against the spawn-count
54    /// audit (a coalesced PaintRect of N tiles with K components each should
55    /// emit `spawn_with_components == N` AND `payload_components == N*K`).
56    pub static PAYLOAD_COMPONENTS: AtomicU64 = AtomicU64::new(0);
57
58    /// Resets all emission counters to zero.
59    pub fn reset() {
60        SPAWN.store(0, Ordering::Relaxed);
61        SPAWN_WITH_COMPONENTS.store(0, Ordering::Relaxed);
62        DESPAWN.store(0, Ordering::Relaxed);
63        INSERT_COMPONENT.store(0, Ordering::Relaxed);
64        REMOVE_COMPONENT.store(0, Ordering::Relaxed);
65        NOOP.store(0, Ordering::Relaxed);
66        OTHER.store(0, Ordering::Relaxed);
67        PAYLOAD_COMPONENTS.store(0, Ordering::Relaxed);
68    }
69
70    /// Snapshot of all per-command-type emission counters.
71    #[derive(Debug, Clone, Copy)]
72    pub struct CmdEmissionSnapshot {
73        /// Number of `Spawn` commands emitted.
74        pub spawn: u64,
75        /// Number of `SpawnWithComponents` commands emitted.
76        pub spawn_with_components: u64,
77        /// Number of `Despawn` commands emitted.
78        pub despawn: u64,
79        /// Number of `InsertComponent` commands emitted.
80        pub insert_component: u64,
81        /// Number of `RemoveComponent` commands emitted.
82        pub remove_component: u64,
83        /// Number of no-op commands emitted.
84        pub noop: u64,
85        /// Number of other commands emitted.
86        pub other: u64,
87        /// Total component payloads across all `SpawnWithComponents` commands.
88        pub payload_components: u64,
89    }
90
91    /// Returns a snapshot of all emission counters.
92    pub fn snapshot() -> CmdEmissionSnapshot {
93        CmdEmissionSnapshot {
94            spawn: SPAWN.load(Ordering::Relaxed),
95            spawn_with_components: SPAWN_WITH_COMPONENTS.load(Ordering::Relaxed),
96            despawn: DESPAWN.load(Ordering::Relaxed),
97            insert_component: INSERT_COMPONENT.load(Ordering::Relaxed),
98            remove_component: REMOVE_COMPONENT.load(Ordering::Relaxed),
99            noop: NOOP.load(Ordering::Relaxed),
100            other: OTHER.load(Ordering::Relaxed),
101            payload_components: PAYLOAD_COMPONENTS.load(Ordering::Relaxed),
102        }
103    }
104}
105
106use log::info;
107use naia_socket_shared::Instant;
108
109use crate::world::sync::RemoteEntityChannel;
110use crate::world::update::entity_update_manager::EntityUpdateManager;
111use crate::{
112    messages::channels::receivers::reliable_receiver::ReliableReceiver,
113    sequence_list::SequenceList,
114    types::{HostType, PacketIndex},
115    world::{
116        entity::entity_converters::GlobalWorldManagerType,
117        host::host_world_manager::{CommandId, HostWorldManager},
118        remote::remote_entity_waitlist::{RemoteEntityWaitlist, WaitlistStore},
119        sync::HostEntityChannel,
120    },
121    ChannelSender, ComponentKind, ComponentKinds, ComponentUpdate, DiffMask,
122    EntityAndGlobalEntityConverter, EntityAuthStatus, EntityCommand, EntityConverterMut,
123    EntityEvent, EntityMessage, EntityMessageType, GlobalEntity, GlobalEntitySpawner, HostEntity,
124    InScopeEntities, LocalEntityAndGlobalEntityConverter, LocalEntityMap, MessageIndex,
125    OwnedLocalEntity, PacketNotifiable, ReliableSender, RemoteEntity, RemoteWorldManager,
126    Replicate, Tick, WorldMutType, WorldRefType,
127};
128
129cfg_if! {
130    if #[cfg(feature = "e2e_debug")] {
131        use crate::world::{
132            host::host_world_manager::SubCommandId,
133            sync::remote_entity_channel::EntityChannelState,
134        };
135    }
136}
137
138const RESEND_COMMAND_RTT_FACTOR: f32 = 1.5;
139const COMMAND_RECORD_TTL: Duration = Duration::from_secs(60);
140
141type SentCommandPackets = SequenceList<(Instant, Vec<(CommandId, EntityMessage<OwnedLocalEntity>)>)>;
142type OutgoingEvents = (VecDeque<(CommandId, EntityCommand)>, HashMap<GlobalEntity, HashSet<ComponentKind>>);
143
144/// Unified manager for one connection's host-side and remote-side entity state, routing commands and events between them.
145pub struct LocalWorldManager {
146    entity_map: LocalEntityMap,
147    sender: ReliableSender<EntityCommand>,
148    sent_command_packets: SentCommandPackets,
149    receiver: ReliableReceiver<EntityMessage<OwnedLocalEntity>>,
150
151    host: HostWorldManager,
152    remote: RemoteWorldManager,
153    updater: EntityUpdateManager,
154
155    /// Entities with ScopeExit::Persist that are currently out-of-scope.
156    /// Replication is frozen for these entities until re-entry.
157    paused_entities: HashSet<GlobalEntity>,
158
159    // TODO: this is kind of specific to the receiver, put it somewhere else?
160    incoming_components: HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
161
162    // TODO: this is kind of specific to the updater, put it somewhere else?
163    incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
164}
165
166impl LocalWorldManager {
167    /// Creates a `LocalWorldManager` for the given address, host type, user key, and global world manager reference.
168    pub fn new(
169        address: &Option<SocketAddr>,
170        host_type: HostType,
171        user_key: u64,
172        global_world_manager: &dyn GlobalWorldManagerType,
173    ) -> Self {
174        Self {
175            entity_map: LocalEntityMap::new(host_type),
176            sender: ReliableSender::new(RESEND_COMMAND_RTT_FACTOR, None),
177            sent_command_packets: SequenceList::new(),
178            receiver: ReliableReceiver::new(),
179
180            host: HostWorldManager::new(host_type, user_key),
181            remote: RemoteWorldManager::new(host_type),
182            updater: EntityUpdateManager::new(address, global_world_manager),
183
184            paused_entities: HashSet::new(),
185
186            incoming_components: HashMap::new(),
187            incoming_updates: Vec::new(),
188        }
189    }
190
191    pub(crate) fn entity_waitlist_queue<T>(
192        &mut self,
193        remote_entity_set: &HashSet<RemoteEntity>,
194        waitlist_store: &mut WaitlistStore<T>,
195        message: T,
196    ) {
197        self.remote
198            .entity_waitlist_queue(remote_entity_set, waitlist_store, message);
199    }
200
201    // EntityMap-focused
202
203    /// Returns a read-only entity converter backed by the internal entity map.
204    pub fn entity_converter(&self) -> &dyn LocalEntityAndGlobalEntityConverter {
205        self.entity_map.entity_converter()
206    }
207
208    /// Returns a mutable entity converter that can also allocate new host entity IDs.
209    pub fn entity_converter_mut<'a, 'b>(
210        &'b mut self,
211        global_world_manager: &'a dyn GlobalWorldManagerType,
212    ) -> EntityConverterMut<'a, 'b> {
213        self.host
214            .entity_converter_mut(global_world_manager, &mut self.entity_map)
215    }
216
217    /// Returns `true` if `global_entity` is currently tracked by either the host or remote engine.
218    pub fn has_global_entity(&self, global_entity: &GlobalEntity) -> bool {
219        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
220            return false;
221        };
222        self.has_local_entity(&local_entity)
223    }
224
225    /// Returns `true` if `local_entity` is currently registered in its respective engine.
226    pub fn has_local_entity(&self, local_entity: &OwnedLocalEntity) -> bool {
227        match local_entity {
228            OwnedLocalEntity::Host { id, is_static: true } => {
229                self.host.has_entity(&HostEntity::new_static(*id))
230            }
231            OwnedLocalEntity::Host { id, is_static: false } => {
232                self.host.has_entity(&HostEntity::new(*id))
233            }
234            OwnedLocalEntity::Remote { id, is_static } => {
235                let remote = if *is_static { RemoteEntity::new_static(*id) } else { RemoteEntity::new(*id) };
236                self.remote.has_entity(&remote)
237            }
238        }
239    }
240
241    /// Get a reference to a HostEntityChannel (for testing)
242    pub fn get_host_entity_channel(
243        &self,
244        entity: &HostEntity,
245    ) -> Option<&crate::world::sync::HostEntityChannel> {
246        self.host.get_entity_channel(entity)
247    }
248
249    /// Get a mutable reference to a HostEntityChannel (for testing)
250    pub fn get_host_entity_channel_mut(
251        &mut self,
252        entity: &HostEntity,
253    ) -> Option<&mut crate::world::sync::HostEntityChannel> {
254        self.host.get_entity_channel_mut(entity)
255    }
256
257    // Host-focused
258
259    /// Returns `true` if `host_entity` is currently tracked by the host engine.
260    pub fn has_host_entity(&self, host_entity: &HostEntity) -> bool {
261        self.host.has_entity(host_entity)
262    }
263
264    /// Allocates a host entity ID and enqueues the initial spawn command(s) when `global_entity` enters connection scope.
265    pub fn host_init_entity(
266        &mut self,
267        global_entity: &GlobalEntity,
268        component_kinds: Vec<ComponentKind>,
269        component_kinds_map: &ComponentKinds,
270        is_static: bool,
271    ) {
272        // Stale-mapping detection (per [entity-delegation-15] / scope re-entry):
273        // If the entity is mapped but its HostEntityChannel has already been
274        // removed from the HostEngine, that means a Despawn was sent (channel
275        // removed synchronously) and the ACK hasn't yet recycled the id from
276        // the entity_map. The remaining map entry is stale. Evict it so that
277        // we allocate a *fresh* HostEntity below — otherwise the stale ACK,
278        // when it arrives, would call `on_delivered_despawn_entity` and wipe
279        // the new mapping (since recycled HostEntity ids would alias).
280        if let Ok(existing_host_entity) =
281            self.entity_map.global_entity_to_host_entity(global_entity)
282        {
283            let channel_alive = self
284                .host
285                .get_host_world()
286                .contains_key(&existing_host_entity);
287            if !channel_alive {
288                // Stale mapping. Drop it without recycling the id (the in-flight
289                // Despawn ACK will recycle later via the idempotent path in
290                // `HostEntityGenerator::remove_by_host_entity`).
291                self.entity_map.remove_by_global_entity(global_entity);
292            }
293        }
294
295        if self
296            .entity_map
297            .global_entity_to_host_entity(global_entity)
298            .is_err()
299        {
300            // this is done because `host_reserve_entity()` may have been called previously!
301            if is_static {
302                let host_entity = self.host.host_generate_static_entity();
303                self.entity_map
304                    .insert_with_static_host_entity(*global_entity, host_entity);
305            } else {
306                let host_entity = self.host.host_generate_entity();
307                self.entity_map
308                    .insert_with_host_entity(*global_entity, host_entity);
309            }
310        }
311
312        if is_static {
313            self.host.init_static_entity_send_host_commands(
314                &self.entity_map,
315                global_entity,
316                component_kinds,
317            );
318        } else {
319            self.host.init_entity_send_host_commands(
320                &self.entity_map,
321                global_entity,
322                component_kinds,
323                &mut self.updater,
324                component_kinds_map,
325            );
326        }
327    }
328
329    /// BULLETPROOF: Migrate entity from remote (client) control to host (server) control
330    ///
331    /// This method performs a complete, atomic migration of an entity from client control
332    /// to server control, including:
333    /// - Force-draining all buffered operations
334    /// - Preserving component state
335    /// - Installing entity redirects
336    /// - Updating command references
337    /// - Cleaning up old entity channels
338    ///
339    /// # Errors
340    ///
341    /// This method will panic if:
342    /// - The entity doesn't exist in the local entity map
343    /// - The entity is not currently remote-owned
344    /// - Any step of the migration process fails
345    ///
346    /// # Safety
347    ///
348    /// This method is designed to be atomic - either the entire migration succeeds
349    /// or the system remains in a consistent state. No partial migrations are possible.
350    pub fn migrate_entity_remote_to_host(
351        &mut self,
352        global_entity: &GlobalEntity,
353    ) -> Result<HostEntity, String> {
354        // Validate entity exists and is remote-owned
355        let Some(local_entity_record) = self.entity_map.remove_by_global_entity(global_entity)
356        else {
357            return Err(format!(
358                "Entity does not exist in local entity map: {:?}",
359                global_entity
360            ));
361        };
362
363        if !local_entity_record.is_remote_owned() {
364            // Restore the entity record since we removed it
365            self.entity_map
366                .insert_with_remote_entity(*global_entity, local_entity_record.remote_entity());
367            return Err(format!("Entity is not remote-owned: {:?}", global_entity));
368        }
369        let old_remote_entity = local_entity_record.remote_entity();
370
371        // create new host entity, insert into local entity map
372        let new_host_entity = self.host.host_generate_entity();
373
374        self.entity_map
375            .insert_with_host_entity(*global_entity, new_host_entity);
376
377        // CRITICAL: After migration, global_entity_to_remote_entity() must fail for this global_entity
378        // remove_by_global_entity should have removed the remote mapping, but verify it's gone
379        // This prevents SetAuthority from encoding via stale global->remote mapping
380        // Double-check: ensure old remote mapping is completely removed from remote_to_global
381        self.entity_map
382            .remove_remote_mapping_if_exists(&old_remote_entity);
383
384        // Verify the invariant: after migration, global_entity should NOT convert to remote_entity
385        // This is a defensive check - if this fails, there's a bug in remove_by_global_entity
386        debug_assert!(
387            self.entity_map
388                .entity_converter()
389                .global_entity_to_remote_entity(global_entity)
390                .is_err(),
391            "After migration, global_entity_to_remote_entity must fail for migrated entity"
392        );
393
394        // BULLETPROOF: Step 1: Force-drain all buffers in RemoteEntityChannel
395        // This ensures all pending operations are processed before migration
396        self.remote.force_drain_entity_buffers(&old_remote_entity);
397
398        // BULLETPROOF: Step 2: Extract component state from RemoteEntityChannel
399        // This preserves the current component state during migration
400        let component_kinds = self.remote.extract_component_kinds(&old_remote_entity);
401
402        // BULLETPROOF: Step 3: Remove RemoteEntityChannel from RemoteEngine
403        // This must succeed or we're in an inconsistent state
404        let _old_remote_channel = self.remote.remove_entity_channel(&old_remote_entity);
405
406        // BULLETPROOF: Step 4: Create new HostEntityChannel with extracted component state
407        // This creates the new server-side entity channel with preserved state
408        let new_host_channel =
409            HostEntityChannel::new_with_components(self.entity_map.host_type(), component_kinds);
410
411        // BULLETPROOF: Step 5: Insert new HostEntityChannel into HostEngine
412        // This must succeed or we lose the entity channel
413        self.host
414            .insert_entity_channel(new_host_entity, new_host_channel);
415
416        // BULLETPROOF: Step 6: Install entity redirect in LocalEntityMap
417        // This allows old entity references to be automatically updated
418        let old_entity = old_remote_entity.copy_to_owned();
419        let new_entity = OwnedLocalEntity::Host { id: new_host_entity.value(), is_static: false };
420        self.entity_map
421            .install_entity_redirect(old_entity, new_entity);
422
423        // BULLETPROOF: Step 7: Update all references in sent_command_packets
424        // This ensures pending commands are sent to the correct entity
425        self.update_sent_command_entity_refs(global_entity, old_entity, new_entity);
426
427        // BULLETPROOF: Step 8: Clean up old remote entity
428        // This removes the old client-side entity channel
429        self.remote
430            .despawn_entity(&mut self.entity_map, &old_remote_entity);
431
432        Ok(new_host_entity)
433    }
434
435    /// Sends an `EnableDelegation` command to the remote peer via the host engine.
436    pub fn host_send_enable_delegation(&mut self, global_entity: &GlobalEntity) {
437        let command = EntityCommand::EnableDelegation(None, *global_entity);
438        self.host.send_command(&self.entity_map, command);
439    }
440
441    /// Forces the `HostEntityChannel` for `host_entity` into the Delegated state locally without sending a wire message.
442    pub fn host_local_enable_delegation(&mut self, host_entity: &HostEntity) {
443        let Some(channel) = self.host.get_entity_channel_mut(host_entity) else {
444            panic!(
445                "Cannot enable delegation on non-existent HostEntity: {:?}",
446                host_entity
447            );
448        };
449        channel.local_enable_delegation();
450    }
451
452    /// Sends a `MigrateResponse` command to notify the peer that an entity has migrated to the server's control.
453    pub fn host_send_migrate_response(
454        &mut self,
455        global_entity: &GlobalEntity,
456        old_remote_entity: &RemoteEntity, // Server's RemoteEntity (represents client's entity)
457        new_host_entity: &HostEntity,     // Server's new HostEntity (what server created)
458    ) {
459        // EntityCommand::MigrateResponse signature: (subid, global, RemoteEntity, HostEntity)
460        // These types are from SERVER perspective and will be reinterpreted by CLIENT
461        let command = EntityCommand::MigrateResponse(
462            None,
463            *global_entity,
464            *old_remote_entity,
465            *new_host_entity,
466        );
467        self.host.send_command(&self.entity_map, command);
468    }
469
470    #[track_caller]
471    /// Sends a `SetAuthority` command for `global_entity` with the given `auth_status`.
472    pub fn host_send_set_auth(
473        &mut self,
474        global_entity: &GlobalEntity,
475        auth_status: EntityAuthStatus,
476    ) {
477        #[cfg(feature = "e2e_debug")]
478        {
479            crate::e2e_trace!(
480                "[SERVER_SEND] SetAuthority entity={:?} status={:?}",
481                global_entity,
482                auth_status
483            );
484        }
485        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
486            panic!("Attempting to send SetAuthority for entity which does not exist in local entity map! {:?}", global_entity);
487        };
488
489        let command = EntityCommand::SetAuthority(None, *global_entity, auth_status);
490        if local_entity.is_host() {
491            self.host.send_command(&self.entity_map, command);
492        } else {
493            // For RemoteEntity, use remote.send_auth_command (similar to send_publish)
494            self.remote
495                .send_auth_command(self.entity_map.entity_converter(), command);
496        }
497    }
498
499    /// Pre-allocates a `HostEntity` slot for `global_entity` before it is sent to the peer.
500    pub fn host_reserve_entity(&mut self, global_entity: &GlobalEntity) -> HostEntity {
501        self.host
502            .host_reserve_entity(&mut self.entity_map, global_entity)
503    }
504
505    /// Removes and returns any previously reserved `HostEntity` for `global_entity`, if one exists.
506    pub fn host_remove_reserved_entity(
507        &mut self,
508        global_entity: &GlobalEntity,
509    ) -> Option<HostEntity> {
510        self.host.host_removed_reserved_entity(global_entity)
511    }
512
513    pub(crate) fn insert_sent_command_packet(&mut self, packet_index: &PacketIndex, now: Instant) {
514        if !self
515            .sent_command_packets
516            .contains_scan_from_back(packet_index)
517        {
518            self.sent_command_packets
519                .insert_scan_from_back(*packet_index, (now, Vec::new()));
520        }
521    }
522
523    pub(crate) fn record_command_written(
524        &mut self,
525        packet_index: &PacketIndex,
526        command_id: &CommandId,
527        message: EntityMessage<OwnedLocalEntity>,
528    ) {
529        #[cfg(feature = "bench_instrumentation")]
530        {
531            use std::sync::atomic::Ordering;
532            match &message {
533                EntityMessage::Spawn(_) => {
534                    cmd_emission_counters::SPAWN.fetch_add(1, Ordering::Relaxed);
535                }
536                EntityMessage::SpawnWithComponents(_, kinds) => {
537                    cmd_emission_counters::SPAWN_WITH_COMPONENTS
538                        .fetch_add(1, Ordering::Relaxed);
539                    cmd_emission_counters::PAYLOAD_COMPONENTS
540                        .fetch_add(kinds.len() as u64, Ordering::Relaxed);
541                }
542                EntityMessage::Despawn(_) => {
543                    cmd_emission_counters::DESPAWN.fetch_add(1, Ordering::Relaxed);
544                }
545                EntityMessage::InsertComponent(_, _) => {
546                    cmd_emission_counters::INSERT_COMPONENT.fetch_add(1, Ordering::Relaxed);
547                }
548                EntityMessage::RemoveComponent(_, _) => {
549                    cmd_emission_counters::REMOVE_COMPONENT.fetch_add(1, Ordering::Relaxed);
550                }
551                EntityMessage::Noop => {
552                    cmd_emission_counters::NOOP.fetch_add(1, Ordering::Relaxed);
553                }
554                _ => {
555                    cmd_emission_counters::OTHER.fetch_add(1, Ordering::Relaxed);
556                }
557            }
558        }
559
560        let (_, sent_actions_list) = self
561            .sent_command_packets
562            .get_mut_scan_from_back(packet_index)
563            .unwrap();
564        sent_actions_list.push((*command_id, message));
565    }
566
567    // Remote-focused
568
569    /// Returns the [`GlobalEntity`] list for all entities currently tracked as remote-owned.
570    pub fn remote_entities(&self) -> Vec<GlobalEntity> {
571        self.entity_map.remote_entities()
572    }
573
574    #[cfg(feature = "e2e_debug")]
575    pub fn debug_remote_channel_diagnostic(
576        &self,
577        remote_entity: &RemoteEntity,
578    ) -> Option<(
579        EntityChannelState,
580        (SubCommandId, usize, Option<SubCommandId>, usize),
581    )> {
582        self.remote.debug_channel_diagnostic(remote_entity)
583    }
584
585    #[cfg(feature = "e2e_debug")]
586    pub fn debug_remote_channel_snapshot(
587        &self,
588        remote_entity: &RemoteEntity,
589    ) -> Option<(
590        EntityChannelState,
591        Option<MessageIndex>,
592        usize,
593        Option<(MessageIndex, EntityMessageType)>,
594        Option<MessageIndex>,
595    )> {
596        self.remote.debug_channel_snapshot(remote_entity)
597    }
598
599    /// Sends an `EnableDelegationResponse` acknowledgement to the server after receiving an `EnableDelegation` message.
600    pub fn send_enable_delegation_response(&mut self, global_entity: &GlobalEntity) {
601        let command = EntityCommand::EnableDelegationResponse(None, *global_entity);
602        self.remote.send_auth_command(&self.entity_map, command);
603    }
604
605    /// Sends a `RequestAuthority` command for `global_entity` via the remote engine.
606    pub fn remote_send_request_auth(&mut self, global_entity: &GlobalEntity) {
607        let command = EntityCommand::RequestAuthority(None, *global_entity);
608        self.remote.send_auth_command(&self.entity_map, command);
609    }
610
611    /// Update the RemoteEntityChannel's AuthChannel status (used after migration)
612    pub fn remote_receive_set_auth(
613        &mut self,
614        global_entity: &GlobalEntity,
615        auth_status: EntityAuthStatus,
616    ) {
617        let remote_entity = self
618            .entity_map
619            .entity_converter()
620            .global_entity_to_remote_entity(global_entity)
621            .unwrap();
622        self.remote
623            .receive_set_auth_status(remote_entity, auth_status);
624    }
625
626    /// Get auth status of a remote entity's channel (for testing)
627    pub fn get_remote_entity_auth_status(
628        &self,
629        global_entity: &GlobalEntity,
630    ) -> Option<EntityAuthStatus> {
631        let Ok(owned) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
632            return None;
633        };
634        let OwnedLocalEntity::Remote { .. } = owned else {
635            return None;
636        };
637        self.remote
638            .get_entity_auth_status(&owned.take_remote())
639    }
640
641    /// Returns a mutable reference to the entity waitlist managed by the remote engine.
642    pub fn entity_waitlist_mut(&mut self) -> &mut RemoteEntityWaitlist {
643        self.remote.entity_waitlist_mut()
644    }
645
646    /// Buffers an incoming entity message at the given sequence `id` for ordered delivery.
647    pub fn receiver_buffer_message(
648        &mut self,
649        id: MessageIndex,
650        msg: EntityMessage<OwnedLocalEntity>,
651    ) {
652        // if msg.get_type() != EntityMessageType::Noop {
653        //     use log::info;
654        //     info!(
655        //         "LocalWorldManager::receiver_buffer_message(id={}, msg_type={:?})",
656        //         id,
657        //         msg.get_type()
658        //     );
659        // }
660
661        self.receiver.buffer_message(id, msg);
662    }
663
664    pub(crate) fn insert_received_component(
665        &mut self,
666        local_entity: &OwnedLocalEntity,
667        component_kind: &ComponentKind,
668        component: Box<dyn Replicate>,
669    ) {
670        self.incoming_components
671            .insert((*local_entity, *component_kind), component);
672    }
673
674    pub(crate) fn insert_received_update(
675        &mut self,
676        tick: Tick,
677        local_entity: &OwnedLocalEntity,
678        component_update: ComponentUpdate,
679    ) {
680        self.incoming_updates
681            .push((tick, *local_entity, component_update));
682    }
683
684    /// Drains all buffered incoming messages and update events, applies them to `world`, and returns the resulting [`EntityEvent`]s.
685    pub fn take_incoming_events<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
686        &mut self,
687        spawner: &mut dyn GlobalEntitySpawner<E>,
688        global_world_manager: &dyn GlobalWorldManagerType,
689        component_kinds: &ComponentKinds,
690        world: &mut W,
691        now: &Instant,
692    ) -> Vec<EntityEvent> {
693        let incoming_messages = self.receiver.receive_messages();
694        let mut incoming_host_messages = Vec::new();
695        let mut incoming_remote_messages = Vec::new();
696
697        for (id, incoming_message) in incoming_messages {
698            if incoming_message.get_type() == EntityMessageType::Noop {
699                continue; // skip noop messages
700            }
701
702            // use log::info;
703            // info!(
704            //     "LocalWorldManager::take_incoming_events - processing message: id={}, type={:?}",
705            //     id,
706            //     incoming_message.get_type()
707            // );
708
709            let Some(local_entity) = incoming_message.entity() else {
710                panic!(
711                    "Received message without an entity! Message: {:?}",
712                    incoming_message
713                );
714            };
715            match local_entity {
716                OwnedLocalEntity::Host { id: host_entity, is_static } => {
717                    // Host entity message
718                    let host_entity = if is_static { HostEntity::new_static(host_entity) } else { HostEntity::new(host_entity) };
719                    incoming_host_messages.push((id, incoming_message.with_entity(host_entity)));
720                }
721                OwnedLocalEntity::Remote { .. } => {
722                    // Remote entity message
723                    let remote_entity = local_entity.take_remote();
724                    // Count when Spawn is routed to incoming_remote_messages
725                    #[cfg(feature = "e2e_debug")]
726                    if incoming_message.get_type() == EntityMessageType::Spawn {
727                        extern "Rust" {
728                            fn client_routed_remote_spawn_increment();
729                        }
730                        // Safety: atomic counter defined by the naia-tests harness under
731                        // e2e_debug; no preconditions; never active in production builds.
732                        unsafe {
733                            client_routed_remote_spawn_increment();
734                        }
735                    }
736                    incoming_remote_messages
737                        .push((id, incoming_message.with_entity(remote_entity)));
738                }
739            }
740        }
741
742        let host_events = self.host.take_incoming_events(
743            spawner,
744            global_world_manager,
745            &self.entity_map,
746            world,
747            incoming_host_messages,
748        );
749        let mut remote_events = self.remote.take_incoming_events(
750            spawner,
751            global_world_manager,
752            &mut self.entity_map,
753            component_kinds,
754            world,
755            now,
756            &mut self.incoming_components,
757            std::mem::take(&mut self.incoming_updates),
758            incoming_remote_messages,
759        );
760
761        let mut incoming_events = host_events;
762        incoming_events.append(&mut remote_events);
763
764        incoming_events
765    }
766
767    /// Registers `global_entity` as authority-granted, enabling update tracking for its components.
768    pub fn register_authed_entity(
769        &mut self,
770        global_manager: &dyn GlobalWorldManagerType,
771        global_entity: &GlobalEntity,
772    ) {
773        // info!("Registering authed entity: {:?}", global_entity);
774
775        if let Ok(remote_entity) = self
776            .entity_map
777            .global_entity_to_remote_entity(global_entity)
778        {
779            self.remote.register_authed_entity(&remote_entity);
780        }
781
782        let Some(component_kinds) = global_manager.component_kinds(global_entity) else {
783            // entity has no components yet
784            return;
785        };
786
787        for component_kind in component_kinds.iter() {
788            self.updater
789                .register_component(global_entity, component_kind);
790        }
791    }
792
793    /// Deregisters `global_entity` from authority, stopping update tracking for its components.
794    pub fn deregister_authed_entity(
795        &mut self,
796        global_manager: &dyn GlobalWorldManagerType,
797        global_entity: &GlobalEntity,
798    ) {
799        // info!("Deregistering delegated entity updates for {:?}", global_entity);
800
801        if let Ok(remote_entity) = self
802            .entity_map
803            .global_entity_to_remote_entity(global_entity)
804        {
805            self.remote.deregister_authed_entity(&remote_entity);
806        }
807
808        let Some(component_kinds) = global_manager.component_kinds(global_entity) else {
809            // entity has no components yet
810            return;
811        };
812
813        for component_kind in component_kinds.iter() {
814            self.updater
815                .deregister_component(global_entity, component_kind);
816        }
817    }
818
819    /// Notifies the remote waitlist that `global_entity`'s remote entity has been spawned.
820    pub fn remote_spawn_entity(&mut self, global_entity: &GlobalEntity) {
821        let remote_entity = self
822            .entity_map
823            .global_entity_to_remote_entity(global_entity)
824            .unwrap();
825        self.remote.spawn_entity(&remote_entity);
826    }
827
828    /// Despawns the remote entity mapped from `global_entity` and cleans up the entity map.
829    pub fn remote_despawn_entity(&mut self, global_entity: &GlobalEntity) {
830        let remote_entity = self
831            .entity_map
832            .global_entity_to_remote_entity(global_entity)
833            .unwrap();
834        self.remote
835            .despawn_entity(&mut self.entity_map, &remote_entity);
836    }
837
838    // Update-focused
839
840    pub(crate) fn get_diff_mask(
841        &self,
842        global_entity: &GlobalEntity,
843        component_kind: &ComponentKind,
844    ) -> DiffMask {
845        self.updater.get_diff_mask(global_entity, component_kind)
846    }
847
848    pub(crate) fn record_update(
849        &mut self,
850        now: &Instant,
851        packet_index: &PacketIndex,
852        global_entity: &GlobalEntity,
853        component_kind: &ComponentKind,
854        diff_mask: DiffMask,
855    ) {
856        self.updater
857            .record_update(now, packet_index, global_entity, component_kind, diff_mask);
858    }
859
860    // Joint router
861
862    /// Sends a `Despawn` command for `global_entity` through whichever engine owns it.
863    pub fn despawn_entity(&mut self, global_entity: &GlobalEntity) {
864        // Clean up pause state if entity was Paused (ScopeExit::Persist)
865        self.paused_entities.remove(global_entity);
866
867        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
868            panic!(
869                "Attempting to despawn entity which does not exist in local entity map! {:?}",
870                global_entity
871            );
872        };
873        if local_entity.is_host() {
874            self.host
875                .send_command(&self.entity_map, EntityCommand::Despawn(*global_entity));
876        } else {
877            self.remote
878                .send_entity_command(&self.entity_map, EntityCommand::Despawn(*global_entity));
879        }
880    }
881
882    /// Pause replication for a `ScopeExit::Persist` entity that has left scope.
883    /// The entity stays in the client's entity pool; no further updates are sent
884    /// until `resume_entity` is called on re-entry.
885    pub fn pause_entity(&mut self, global_entity: &GlobalEntity) {
886        self.paused_entities.insert(*global_entity);
887    }
888
889    /// Resume replication for a paused `ScopeExit::Persist` entity that has re-entered scope.
890    /// Accumulated deltas will be delivered on the next update cycle.
891    pub fn resume_entity(&mut self, global_entity: &GlobalEntity) {
892        self.paused_entities.remove(global_entity);
893    }
894
895    /// Returns `true` if `global_entity` is currently paused (scope-exited with `ScopeExit::Persist`).
896    pub fn is_entity_paused(&self, global_entity: &GlobalEntity) -> bool {
897        self.paused_entities.contains(global_entity)
898    }
899
900    /// Sends an `InsertComponent` command for `global_entity`, routing through host or remote engine as appropriate.
901    pub fn insert_component(
902        &mut self,
903        global_entity: &GlobalEntity,
904        component_kind: &ComponentKind,
905    ) {
906        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
907            panic!("Attempting to insert component for entity which does not exist in local entity map! {:?}", global_entity);
908        };
909        if local_entity.is_host() {
910            // Register component immediately when it comes into scope (not waiting for delivery confirmation)
911            // This ensures mutations can set the diff mask right away
912            self.updater
913                .register_component(global_entity, component_kind);
914            self.host.send_command(
915                &self.entity_map,
916                EntityCommand::InsertComponent(*global_entity, *component_kind),
917            );
918        } else {
919            self.remote.send_entity_command(
920                &self.entity_map,
921                EntityCommand::InsertComponent(*global_entity, *component_kind),
922            );
923        }
924    }
925
926    /// Sends a `RemoveComponent` command for `global_entity`, routing through host or remote engine as appropriate.
927    pub fn remove_component(
928        &mut self,
929        global_entity: &GlobalEntity,
930        component_kind: &ComponentKind,
931    ) {
932        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
933            panic!("Attempting to remove component for entity which does not exist in local entity map! {:?}", global_entity);
934        };
935        if local_entity.is_host() {
936            self.host.send_command(
937                &self.entity_map,
938                EntityCommand::RemoveComponent(*global_entity, *component_kind),
939            );
940        } else {
941            self.remote.send_entity_command(
942                &self.entity_map,
943                EntityCommand::RemoveComponent(*global_entity, *component_kind),
944            );
945        }
946    }
947
948    /// Sends a `Publish` command for `global_entity`, routing through host or remote engine based on ownership.
949    pub fn send_publish(&mut self, host_type: HostType, global_entity: &GlobalEntity) {
950        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
951            panic!(
952                "Attempting to publish entity which does not exist in local entity map! {:?}",
953                global_entity
954            );
955        };
956        let host_owned = match (host_type, local_entity.is_host()) {
957            (HostType::Server, true) => {
958                panic!("Server-owned Entities are published by default, invalid!")
959            }
960            (HostType::Client, false) => {
961                panic!("Server-owned Entities are published by default, invalid!")
962            }
963            (HostType::Server, false) => false, // todo!("server is attempting to publish a client-owned non-public remote entity"),
964            (HostType::Client, true) => true, // todo!("client is attempting to publish a client-owned host entity"),
965        };
966
967        let command = EntityCommand::Publish(None, *global_entity);
968        if host_owned {
969            self.host.send_command(&self.entity_map, command);
970        } else {
971            self.remote
972                .send_auth_command(self.entity_map.entity_converter(), command);
973        }
974    }
975
976    /// Sends an `Unpublish` command for `global_entity`, routing through host or remote engine based on ownership.
977    pub fn send_unpublish(&mut self, host_type: HostType, global_entity: &GlobalEntity) {
978        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
979            panic!(
980                "Attempting to publish entity which does not exist in local entity map! {:?}",
981                global_entity
982            );
983        };
984        let host_owned = match (host_type, local_entity.is_host()) {
985            (HostType::Server, true) => panic!("Server-owned Entities cannot be unpublished!"),
986            (HostType::Client, false) => panic!("Server-owned Entities cannot be unpublished!"),
987            (HostType::Server, false) => false, // todo!("server is attempting to unpublish a client-owned public entity"),
988            (HostType::Client, true) => true, // todo!("client is attempting to unpublish a client-owned public entity"),
989        };
990        let command = EntityCommand::Unpublish(None, *global_entity);
991        if host_owned {
992            self.host.send_command(&self.entity_map, command);
993        } else {
994            self.remote
995                .send_auth_command(self.entity_map.entity_converter(), command);
996        }
997    }
998
999    /// Sends an `EnableDelegation` command (with optional preceding `Publish`) for `global_entity`.
1000    pub fn send_enable_delegation(
1001        &mut self,
1002        host_type: HostType,
1003        origin_is_owning_client: bool,
1004        global_entity: &GlobalEntity,
1005    ) {
1006        // let is_delegated = self.entity_map.global_entity_is_delegated(global_entity);
1007        // if is_delegated {
1008        //     panic!("Entity {:?} is already delegated!", global_entity);
1009        // }
1010        let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
1011            panic!("Attempting to enable delegation for entity which does not exist in local entity map! {:?}", global_entity);
1012        };
1013        let host_owned = match (host_type, local_entity.is_host(), origin_is_owning_client) {
1014            (HostType::Server, false, true) => {
1015                panic!("Client cannot originate enable delegation for ANOTHER client-owned entity!")
1016            }
1017            (HostType::Client, _, false) => {
1018                panic!("Client must be the owning client to enable delegation!")
1019            }
1020            (HostType::Client, false, true) => {
1021                panic!("Client cannot enable delegation for a Server-owned entity")
1022            }
1023
1024            (HostType::Server, true, true) => true, // todo!("server is proxying client-originating enable delegation message to client (entity should be host-owned here)"),
1025            (HostType::Server, true, false) => true, // todo!("server is enabling delegation for a server-owned entity (host owned)"),
1026            (HostType::Client, true, true) => true, // todo!("client is attempting to enable delegation for a client-owned entity (host owned)"),
1027            (HostType::Server, false, false) => false, // todo!("server is attempting to delegate a (hopefully published) client-owned entity (remote-owned entity"),
1028        };
1029
1030        if host_owned {
1031            // Check if entity is already Published
1032            let host_entity = self
1033                .entity_map
1034                .global_entity_to_host_entity(global_entity)
1035                .expect("Host entity should exist");
1036
1037            let is_published = if let Some(channel) = self.get_host_entity_channel(&host_entity) {
1038                use crate::world::sync::auth_channel::EntityAuthChannelState;
1039                let state = channel.auth_channel_state();
1040                state == EntityAuthChannelState::Published
1041                    || state == EntityAuthChannelState::Delegated
1042            } else {
1043                false
1044            };
1045
1046            // Only send Publish if entity is NOT already Published
1047            if !is_published {
1048                let publish_command = EntityCommand::Publish(None, *global_entity);
1049                self.host.send_command(&self.entity_map, publish_command);
1050            }
1051
1052            // Always send EnableDelegation (this will transition Published → Delegated)
1053            #[cfg(feature = "e2e_debug")]
1054            crate::e2e_trace!(
1055                "[SERVER_SEND] EnableDelegation entity={:?} callsite=send_enable_delegation(host)",
1056                global_entity
1057            );
1058            let enable_delegation_command = EntityCommand::EnableDelegation(None, *global_entity);
1059            self.host
1060                .send_command(&self.entity_map, enable_delegation_command);
1061        } else {
1062            #[cfg(feature = "e2e_debug")]
1063            crate::e2e_trace!("[SERVER_SEND] EnableDelegation entity={:?} callsite=send_enable_delegation(remote)", global_entity);
1064            let command = EntityCommand::EnableDelegation(None, *global_entity);
1065            self.remote
1066                .send_auth_command(self.entity_map.entity_converter(), command);
1067        }
1068    }
1069
1070    #[track_caller]
1071    /// Sends a `DisableDelegation` command for `global_entity` via the host engine.
1072    pub fn send_disable_delegation(&mut self, global_entity: &GlobalEntity) {
1073        #[cfg(feature = "e2e_debug")]
1074        {
1075            let caller = std::panic::Location::caller();
1076            crate::e2e_trace!(
1077                "[SERVER_SEND] DisableDelegation entity={:?} caller={}:{}",
1078                global_entity,
1079                caller.file(),
1080                caller.line()
1081            );
1082        }
1083        // only server should ever be able to call this, on host-owned (server-owned) entities
1084        let command = EntityCommand::DisableDelegation(None, *global_entity);
1085        self.host.send_command(&self.entity_map, command);
1086    }
1087
1088    /// Sends a `ReleaseAuthority` command for `global_entity`, routing through whichever engine owns it.
1089    pub fn remote_send_release_auth(&mut self, global_entity: &GlobalEntity) {
1090        let command = EntityCommand::ReleaseAuthority(None, *global_entity);
1091
1092        let host_owned = self
1093            .entity_map
1094            .global_entity_to_owned_entity(global_entity)
1095            .unwrap()
1096            .is_host();
1097        if host_owned {
1098            self.host.send_command(&self.entity_map, command);
1099        } else {
1100            self.remote
1101                .send_auth_command(self.entity_map.entity_converter(), command);
1102        }
1103    }
1104
1105    // Joint
1106
1107    /// Processes dropped packet TTLs and handles update-packet retransmit logic for the current tick.
1108    pub fn collect_messages(&mut self, now: &Instant, rtt_millis: &f32) {
1109        self.handle_dropped_command_packets(now);
1110        self.updater.handle_dropped_update_packets(now, rtt_millis);
1111    }
1112
1113    fn handle_dropped_command_packets(&mut self, now: &Instant) {
1114        while let Some((_, (time_sent, _))) = self.sent_command_packets.front() {
1115            if time_sent.elapsed(now) > COMMAND_RECORD_TTL {
1116                self.sent_command_packets.pop_front();
1117            } else {
1118                break;
1119            }
1120        }
1121
1122        // Also cleanup old entity redirects with the same TTL
1123        self.entity_map.cleanup_old_redirects(now, 60);
1124    }
1125
1126    /// Collects pending outbound commands and component-update events, returning them as a pair of command queue and dirty-component map.
1127    pub fn take_outgoing_events<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
1128        &mut self,
1129        now: &Instant,
1130        rtt_millis: &f32,
1131        world: &W,
1132        converter: &dyn EntityAndGlobalEntityConverter<E>,
1133        global_world_manager: &dyn GlobalWorldManagerType,
1134    ) -> OutgoingEvents {
1135        // get outgoing world commands
1136        #[cfg(feature = "bench_instrumentation")]
1137        let t = std::time::Instant::now();
1138        let host_commands = self.host.take_outgoing_commands();
1139        let remote_commands = self.remote.take_outgoing_commands();
1140        for commands in [host_commands, remote_commands] {
1141            for command in commands {
1142                self.sender.send_message(command);
1143            }
1144        }
1145        #[cfg(feature = "bench_instrumentation")]
1146        {
1147            use std::sync::atomic::Ordering;
1148            bench_take_events_counters::NS_HOST_REMOTE_COMMANDS
1149                .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
1150        }
1151
1152        #[cfg(feature = "bench_instrumentation")]
1153        let t = std::time::Instant::now();
1154        self.sender.collect_messages(now, rtt_millis);
1155        let world_commands = self.sender.take_next_messages();
1156        #[cfg(feature = "bench_instrumentation")]
1157        {
1158            use std::sync::atomic::Ordering;
1159            bench_take_events_counters::NS_SENDER_COLLECT
1160                .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
1161        }
1162
1163        // get update events
1164        #[cfg(feature = "bench_instrumentation")]
1165        let t = std::time::Instant::now();
1166        let update_events = self.take_update_events(world, converter, global_world_manager);
1167        #[cfg(feature = "bench_instrumentation")]
1168        {
1169            use std::sync::atomic::Ordering;
1170            bench_take_events_counters::NS_TAKE_UPDATE_EVENTS
1171                .fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
1172        }
1173
1174        // return both
1175        (world_commands, update_events)
1176    }
1177
1178    /// Advances the delivery state machine, applying any newly acknowledged host-side commands to the delivered engine.
1179    pub fn process_delivered_commands(&mut self) {
1180        self.host
1181            .process_delivered_commands(&mut self.entity_map, &mut self.updater);
1182    }
1183
1184    /// Builds the dirty-component map for the current tick from mutation receivers and the world state.
1185    pub fn take_update_events<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
1186        &mut self,
1187        world: &W,
1188        world_converter: &dyn EntityAndGlobalEntityConverter<E>,
1189        global_world_manager: &dyn GlobalWorldManagerType,
1190    ) -> HashMap<GlobalEntity, HashSet<ComponentKind>> {
1191        let dirty = self.updater.build_dirty_candidates_from_receivers();
1192        let local_converter = self.entity_map.entity_converter();
1193        let mut updatable_world: HashMap<GlobalEntity, HashSet<ComponentKind>> = HashMap::new();
1194        for (global_entity, kinds) in dirty {
1195            if self.paused_entities.contains(&global_entity) {
1196                continue;
1197            }
1198            for kind in kinds {
1199                if self.host.is_component_updatable(local_converter, &global_entity, &kind)
1200                    || self.remote.is_component_updatable(local_converter, &global_entity, &kind)
1201                {
1202                    updatable_world.entry(global_entity).or_default().insert(kind);
1203                }
1204            }
1205        }
1206        self.updater.take_outgoing_events(world, world_converter, global_world_manager, updatable_world)
1207    }
1208
1209    // pub(crate) fn get_message_reader_helpers<'a, 'b, 'c, E: Copy + Eq + Hash + Sync + Send>(
1210    //     &'b mut self,
1211    //     spawner: &'b mut dyn GlobalEntitySpawner<E>
1212    // ) -> (GlobalEntityReserver<'a, 'b, 'c, E>, &'a mut EntityWaitlist<RemoteEntity>) {
1213    //     let remote= &mut self.remote;
1214    //     let entity_map = &mut self.entity_map;
1215    //     let reserver = remote.get_message_reader_helpers(entity_map, spawner);
1216    //     (reserver, remote.entity_waitlist_mut())
1217    // }
1218
1219    /// Returns the entity converter and entity waitlist as a pair, used during message deserialization.
1220    pub fn get_message_processor_helpers(
1221        &mut self,
1222    ) -> (
1223        &dyn LocalEntityAndGlobalEntityConverter,
1224        &mut RemoteEntityWaitlist,
1225    ) {
1226        let entity_converter = self.entity_map.entity_converter();
1227        let entity_waitlist = self.remote.entity_waitlist_mut();
1228        (entity_converter, entity_waitlist)
1229    }
1230
1231    fn host_notify_packet_delivered(&mut self, packet_index: PacketIndex) {
1232        if let Some((_, command_list)) = self
1233            .sent_command_packets
1234            .remove_scan_from_front(&packet_index)
1235        {
1236            for (command_id, command) in command_list {
1237                if self.sender.deliver_message(&command_id).is_some() {
1238                    self.deliver_message(command_id, command);
1239                }
1240            }
1241        }
1242    }
1243
1244    fn deliver_message(&mut self, id: CommandId, msg: EntityMessage<OwnedLocalEntity>) {
1245        if msg.is_noop() {
1246            return;
1247        }
1248        let Some(local_entity) = msg.entity() else {
1249            panic!("Delivered message without an entity! Message: {:?}", msg);
1250        };
1251        match local_entity {
1252            OwnedLocalEntity::Host { id: host_entity, is_static } => {
1253                // Host entity message
1254                let host_entity = if is_static { HostEntity::new_static(host_entity) } else { HostEntity::new(host_entity) };
1255                self.host.deliver_message(id, msg.with_entity(host_entity));
1256            }
1257            OwnedLocalEntity::Remote { .. } => {
1258                // Remote entity message
1259                let remote_entity = local_entity.take_remote();
1260                self.remote
1261                    .deliver_message(id, msg.with_entity(remote_entity));
1262            }
1263        }
1264    }
1265
1266    /// Patches all in-flight command packets to replace `old_entity` references with `new_entity`.
1267    pub fn update_sent_command_entity_refs(
1268        &mut self,
1269        _global_entity: &GlobalEntity,
1270        old_entity: OwnedLocalEntity,
1271        new_entity: OwnedLocalEntity,
1272    ) {
1273        // Iterate through sent_command_packets and update entity references
1274        for (_, (_, commands)) in self.sent_command_packets.iter_mut() {
1275            for (_, message) in commands.iter_mut() {
1276                if let Some(entity) = message.entity() {
1277                    if entity == old_entity {
1278                        *message = message.clone().with_entity(new_entity);
1279                    }
1280                }
1281            }
1282        }
1283    }
1284
1285    /// Extracts and returns all pending [`EntityCommand`]s from the host engine channel for `global_entity`.
1286    pub fn extract_host_entity_commands(
1287        &mut self,
1288        global_entity: &GlobalEntity,
1289    ) -> Vec<EntityCommand> {
1290        // Get host_entity from entity_map
1291        let host_entity = self
1292            .entity_map
1293            .global_entity_to_host_entity(global_entity)
1294            .unwrap();
1295        // Extract commands from host engine
1296        self.host.extract_entity_commands(&host_entity)
1297    }
1298
1299    /// Returns the set of component kinds currently registered on the host engine channel for `global_entity`.
1300    pub fn extract_host_component_kinds(
1301        &self,
1302        global_entity: &GlobalEntity,
1303    ) -> HashSet<ComponentKind> {
1304        // Get host_entity from entity_map
1305        let host_entity = self
1306            .entity_map
1307            .global_entity_to_host_entity(global_entity)
1308            .unwrap();
1309        // Get host_entity_channel from host engine
1310        let channel = self.host.get_entity_channel(&host_entity).unwrap();
1311        // Return component_channels clone
1312        channel.component_kinds().clone()
1313    }
1314
1315    /// Removes the host engine channel and entity map entry for `global_entity`.
1316    pub fn remove_host_entity(&mut self, global_entity: &GlobalEntity) {
1317        // Lookup host_entity FIRST before removing from entity_map
1318        let host_entity = self
1319            .entity_map
1320            .global_entity_to_host_entity(global_entity)
1321            .unwrap();
1322        // Remove from host engine
1323        self.host.remove_entity_channel(&host_entity);
1324        // Remove from entity_map LAST
1325        self.entity_map.remove_by_global_entity(global_entity);
1326    }
1327
1328    /// Registers a remote entity migrated from the host side into the remote engine with an initial component set.
1329    pub fn insert_remote_entity(
1330        &mut self,
1331        global_entity: &GlobalEntity,
1332        remote_entity: RemoteEntity,
1333        component_kinds: HashSet<ComponentKind>,
1334    ) {
1335        // Insert into entity_map
1336        self.entity_map
1337            .insert_with_remote_entity(*global_entity, remote_entity);
1338
1339        if self.remote.has_entity_channel(&remote_entity) {
1340            // Case: Channel was auto-created by messages arriving before the MigrateResponse event was processed
1341            // We need to upgrade this channel to be delegated and have the correct component state
1342            info!(
1343                "RemoteEntity({:?}) channel already exists (likely from out-of-order SetAuthority). Upgrading to Delegated.",
1344                remote_entity
1345            );
1346            let channel = self.remote.get_entity_channel_mut(&remote_entity).unwrap();
1347
1348            // Upgrade to delegated
1349            channel.configure_as_delegated();
1350
1351            // Set state to Spawned (if not already)
1352            // Note: We don't want to overwrite if it's already Spawned, but for migration we assume it should be
1353            channel.set_spawned(0);
1354
1355            // Insert component channels
1356            for component_kind in component_kinds {
1357                channel.insert_component_channel_as_inserted(component_kind, 0);
1358            }
1359        } else {
1360            // Normal Case: Create new delegated channel
1361            let mut channel = RemoteEntityChannel::new_delegated(self.entity_map.host_type());
1362
1363            // Set state to Spawned
1364            channel.set_spawned(0);
1365
1366            // For each component_kind, add RemoteComponentChannel with inserted=true
1367            for component_kind in component_kinds {
1368                channel.insert_component_channel_as_inserted(component_kind, 0);
1369            }
1370
1371            // Insert into remote engine
1372            self.remote.insert_entity_channel(remote_entity, channel);
1373        }
1374    }
1375
1376    /// Installs a redirect so lookups of `old` transparently resolve to `new` for a TTL period.
1377    pub fn install_entity_redirect(&mut self, old: OwnedLocalEntity, new: OwnedLocalEntity) {
1378        self.entity_map.install_entity_redirect(old, new);
1379    }
1380
1381    /// Returns the redirected entity for `entity` if a redirect is installed, otherwise returns `entity` unchanged.
1382    pub fn apply_entity_redirect(&self, entity: OwnedLocalEntity) -> OwnedLocalEntity {
1383        self.entity_map.apply_entity_redirect(&entity)
1384    }
1385
1386    /// Re-submits `command` for `global_entity` through the remote engine after a migration.
1387    pub fn replay_entity_command(&mut self, global_entity: &GlobalEntity, command: EntityCommand) {
1388        // Send command through appropriate channel (should be remote after migration)
1389        let _remote_entity = self
1390            .entity_map
1391            .global_entity_to_remote_entity(global_entity)
1392            .unwrap();
1393        self.remote.send_entity_command(&self.entity_map, command);
1394    }
1395}
1396
1397impl PacketNotifiable for LocalWorldManager {
1398    fn notify_packet_delivered(&mut self, packet_index: PacketIndex) {
1399        self.host_notify_packet_delivered(packet_index);
1400        self.updater.notify_packet_delivered(packet_index);
1401    }
1402}
1403
1404cfg_if! {
1405    if #[cfg(feature = "interior_visibility")] {
1406
1407        use crate::LocalEntity;
1408
1409        impl LocalWorldManager {
1410
1411            /// Returns all local entities currently tracked by the entity map.
1412            pub fn local_entities(&self) -> Vec<LocalEntity> {
1413                self.entity_map
1414                .iter()
1415                .map(|(_, record)| LocalEntity::from(record.owned_entity()))
1416                .collect::<Vec<LocalEntity>>()
1417            }
1418        }
1419    }
1420}
1421
1422#[cfg(feature = "test_utils")]
1423impl LocalWorldManager {
1424    #[doc(hidden)]
1425    pub fn diff_handler_receiver_count(&self) -> usize {
1426        self.updater.diff_handler_receiver_count()
1427    }
1428
1429    #[doc(hidden)]
1430    pub fn dirty_update_count(&self) -> usize {
1431        self.updater.dirty_candidates_len()
1432    }
1433}