Skip to main content

naia_shared/world/sync/
remote_entity_channel.rs

1//! ## `EntityChannel` – Per‑Entity Demultiplexer
2//!
3//! This module owns the **state machine and buffering logic for a *single
4//! entity*** travelling across an **unordered, reliable** transport.
5//!
6//! ---
7//! ### 1 · What problem does it solve?
8//! * Messages can arrive *out of order*
9//! * Certain message kinds must obey **strict causal order _within_ the
10//!   entity** (e.g. a component can’t be inserted before the entity exists).
11//!
12//! `EntityChannel` absorbs the raw `EntityMessage<()>` stream, re‑orders and
13//! filters it, and emits **ready‑to‑apply** messages in the *only* sequence
14//! the game‑logic needs to respect.
15//!
16//! ---
17//! ### 2 · State machine
18//!
19//! ```text
20//!                 +-----------------------------+
21//!                 |   Despawned (initial)       |
22//!                 +-----------------------------+
23//!                     | SpawnEntity(idₛ)  ▲
24//!                     v                   |
25//!                 +-----------------------------+
26//!                 |     Spawned                 |
27//!                 +-----------------------------+
28//!                     | DespawnEntity(id_d)     |
29//!                     +-------------------------+
30//! ```
31//!
32//! * **`Despawned`** – entity is not present; buffers *only* the next
33//!   `SpawnEntity` plus any later auth/component messages (they will flush
34//!   once the spawn occurs).
35//! * **`Spawned`** – entity is live; forwards component/auth messages to the
36//!   corresponding sub‑channels and drains their output immediately.
37//!
38//! ---
39//! ### 3 · Message ingest algorithm
40//! 1. **Gating by `last_epoch_id `**
41//!    A message whose `id ≤ last_epoch_id ` is *by definition* older than the
42//!    authoritative `SpawnEntity`; drop it to guarantee *at‑most‑once
43//!    semantics*; wrap‑around itself is handled automatically by the
44//!    wrap‑safe `u16` comparison helpers—no epoch reset is performed.
45//! 2. **Buffered queue (`OrderedIds`)**  
46//!    Messages are pushed into `buffered_messages`, ordered by the `u16`
47//!    sequence with wrap‑safe comparison.  
48//!    `process_messages()` iterates from the head while the next candidate is
49//!    *legal* under the current FSM state.
50//! 3. **Draining**  
51//!    Once a message is applied, it is moved into `outgoing_messages`.  
52//!    `Engine::drain_messages_into` later annotates them with the concrete
53//!    entity handle and forwards them to the ECS.
54//!
55//! ---
56//! ### 4 · Sub‑channels
57//! * **`AuthChannel`** – publishes, unpublishes, and delegates authority.
58//! * **`ComponentChannel`** (one per `ComponentKind`) – tracks insert/remove
59//!   toggles, guaranteeing idempotency via its own `last_insert_id` guard.
60//!
61//! `EntityChannel` coordinates these sub‑channels but *never* peers inside
62//! their logic; it merely aligns their buffers with the entity’s lifecycle
63//! (e.g., flush everything ≤ `idₛ` at spawn, reset on despawn).
64//!
65//! ---
66//! ### 5 · Key invariants
67//! * **Spawn barrier** – No component/auth message can overtake the spawn
68//!   that legitimises it.
69//! * **Monotonic visibility** – Once a message has been emitted to
70//!   `outgoing_messages`, the channel guarantees it will never retract or
71//!   reorder that message.
72//!
73//! Together, these guarantees let higher layers treat the engine as if every
74//! entity had its own perfect *ordered* stream—while the network enjoys the
75//! performance of a single unordered reliable channel.
76
77use std::{
78    collections::{HashMap, HashSet},
79    hash::Hash,
80};
81
82use crate::{
83    sequence_less_than, world::sync::remote_component_channel::RemoteComponentChannel,
84    ComponentKind, EntityAuthStatus, EntityCommand, EntityMessage, EntityMessageType, HostType,
85    MessageIndex,
86};
87
88cfg_if! {
89    if #[cfg(feature = "e2e_debug")] {
90        use crate::world::host::host_world_manager::SubCommandId;
91    }
92}
93
94/// Spawn/despawn lifecycle state of a remote entity channel.
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96#[cfg_attr(feature = "e2e_debug", allow(dead_code))]
97pub enum EntityChannelState {
98    /// Entity has not yet been spawned (or has been despawned).
99    Despawned,
100    /// Entity is live; component and authority messages are forwarded immediately.
101    Spawned,
102}
103
104/// Per-entity demultiplexer that buffers and reorders incoming messages from an unordered reliable channel into a causal stream.
105pub struct RemoteEntityChannel {
106    state: EntityChannelState,
107    last_epoch_id: Option<MessageIndex>,
108
109    component_channels: HashMap<ComponentKind, RemoteComponentChannel>,
110    auth_channel: AuthChannel,
111
112    buffered_messages: OrderedIds<EntityMessage<()>>,
113    incoming_messages: Vec<EntityMessage<()>>,
114    outgoing_commands: Vec<EntityCommand>,
115}
116
117impl RemoteEntityChannel {
118    /// Creates a fresh `RemoteEntityChannel` in the `Despawned` state for `host_type`.
119    pub fn new(host_type: HostType) -> Self {
120        Self {
121            state: EntityChannelState::Despawned,
122            last_epoch_id: None,
123
124            component_channels: HashMap::new(),
125            auth_channel: AuthChannel::new(host_type),
126
127            buffered_messages: OrderedIds::new(),
128            incoming_messages: Vec::new(),
129            outgoing_commands: Vec::new(),
130        }
131    }
132
133    /// Create a RemoteEntityChannel for a delegated entity (used during migration)
134    ///
135    /// After migration, MigrateResponse has subcommand_id=0, so the next message (SetAuthority)
136    /// will have subcommand_id=1. We need to sync the receiver's next_subcommand_id accordingly.
137    pub fn new_delegated(host_type: HostType) -> Self {
138        let mut channel = Self::new(host_type);
139        channel.configure_as_delegated();
140        channel
141    }
142
143    /// Configures the auth sub-channel into the `Delegated` state, simulating `Publish → EnableDelegation` transitions.
144    pub fn configure_as_delegated(&mut self) {
145        // Set up the AuthChannel for a delegated entity
146        // This simulates the entity having gone through Publish → EnableDelegation
147        self.auth_channel.force_publish();
148        self.auth_channel.force_enable_delegation();
149        // Sync subcommand_id: MigrateResponse has subcommand_id=0, so next is 1
150        self.auth_channel.receiver_set_next_subcommand_id(1);
151    }
152
153    /// Overrides the authority status stored in the auth sub-channel to match the global tracker after migration.
154    pub fn update_auth_status(&mut self, auth_status: EntityAuthStatus) {
155        self.auth_channel.force_set_auth_status(auth_status);
156    }
157
158    /// Returns the current authority status recorded in the auth sub-channel.
159    pub fn auth_status(&self) -> Option<EntityAuthStatus> {
160        self.auth_channel.auth_status()
161    }
162
163    /// Returns `true` if the auth sub-channel is in the Delegated state.
164    pub fn is_delegated(&self) -> bool {
165        self.auth_channel.is_delegated()
166    }
167
168    pub(crate) fn receive_message(&mut self, id: MessageIndex, msg: EntityMessage<()>) {
169        if let Some(last_epoch_id) = self.last_epoch_id {
170            if last_epoch_id == id {
171                panic!("EntityChannel received a message with the same id as the last epoch id. This should not happen. Message: {:?}", msg);
172            }
173
174            if sequence_less_than(id, last_epoch_id) {
175                // This message is older than the last spawn message, ignore it
176                return;
177            }
178        }
179
180        self.buffered_messages.push_back(id, msg);
181
182        self.process_messages();
183    }
184
185    /// Enqueues `command` through the authority sub-channel for outbound delivery.
186    pub fn send_command(&mut self, command: EntityCommand) {
187        self.auth_channel.send_command(command);
188        self.auth_channel
189            .sender_drain_messages_into(&mut self.outgoing_commands);
190    }
191
192    pub(crate) fn drain_incoming_messages_into<E: Copy + Hash + Eq>(
193        &mut self,
194        entity: E,
195        outgoing_events: &mut Vec<EntityMessage<E>>,
196    ) {
197        // Drain the entity channel and append the messages to the outgoing events
198        let mut received_messages = Vec::new();
199        for rmsg in std::mem::take(&mut self.incoming_messages) {
200            received_messages.push(rmsg.with_entity(entity));
201        }
202        outgoing_events.append(&mut received_messages);
203    }
204
205    pub(crate) fn drain_outgoing_messages_into(
206        &mut self,
207        outgoing_commands: &mut Vec<EntityCommand>,
208    ) {
209        outgoing_commands.append(&mut self.outgoing_commands);
210    }
211
212    pub(crate) fn has_component_kind(&self, component_kind: &ComponentKind) -> bool {
213        self.component_channels.contains_key(component_kind)
214    }
215
216    fn process_messages(&mut self) {
217        loop {
218            let Some((id, msg)) = self.buffered_messages.peek_front() else {
219                break;
220            };
221            let id = *id;
222
223            match msg.get_type() {
224                EntityMessageType::Spawn => {
225                    if self.state != EntityChannelState::Despawned {
226                        break;
227                    }
228
229                    self.state = EntityChannelState::Spawned;
230                    // Count when Spawn transitions state to Spawned
231                    #[cfg(feature = "e2e_debug")]
232                    {
233                        extern "Rust" {
234                            fn client_processed_spawn_increment();
235                        }
236                        // Safety: atomic counter defined by the naia-tests harness under
237                        // e2e_debug; no preconditions; never active in production builds.
238                        unsafe {
239                            client_processed_spawn_increment();
240                        }
241                    }
242                    self.last_epoch_id = Some(id);
243                    // clear buffered messages less than or equal to the last spawn id
244                    self.buffered_messages.pop_front_until_and_excluding(id);
245
246                    self.pop_front_into_outgoing();
247
248                    // Drain the auth channel and append the messages to the outgoing events
249                    self.auth_channel.receiver_buffer_pop_front_until_and_including(id);
250
251                    self.auth_channel.receiver_process_messages(self.state);
252                    self.auth_channel.receiver_drain_messages_into(&mut self.incoming_messages);
253
254                    // Pop buffered messages from the component channels until and excluding the spawn id
255                    // Then process the messages in the component channels
256                    // Then drain the messages into the outgoing messages
257                    for (component_kind, component_channel) in self.component_channels.iter_mut() {
258                        component_channel.buffer_pop_front_until_and_excluding(id);
259                        component_channel.process_messages(self.state);
260                        component_channel.drain_messages_into(component_kind, &mut self.incoming_messages);
261                    }
262                }
263                EntityMessageType::SpawnWithComponents => {
264                    if self.state != EntityChannelState::Despawned {
265                        break;
266                    }
267
268                    self.state = EntityChannelState::Spawned;
269                    self.last_epoch_id = Some(id);
270                    // Discard stale pre-lifetime messages buffered before this id
271                    self.buffered_messages.pop_front_until_and_excluding(id);
272
273                    // Pop the SpawnWithComponents message itself
274                    let (_, msg) = self.buffered_messages.pop_front().unwrap();
275                    let kinds = match msg {
276                        EntityMessage::SpawnWithComponents((), kinds) => kinds,
277                        _ => unreachable!(),
278                    };
279
280                    // Emit synthetic Spawn event
281                    self.incoming_messages.push(EntityMessage::Spawn(()));
282
283                    // Process any pre-buffered component channels (out-of-order arrivals)
284                    for (component_kind, component_channel) in self.component_channels.iter_mut() {
285                        component_channel.buffer_pop_front_until_and_excluding(id);
286                        component_channel.process_messages(self.state);
287                        component_channel.drain_messages_into(component_kind, &mut self.incoming_messages);
288                    }
289
290                    // Accept coalesced components: mark inserted + emit InsertComponent events
291                    for kind in &kinds {
292                        let component_channel = self.component_channels
293                            .entry(*kind)
294                            .or_insert_with(RemoteComponentChannel::new);
295                        component_channel.set_inserted(true, id);
296                        self.incoming_messages.push(EntityMessage::InsertComponent((), *kind));
297                    }
298
299                    // Drain auth channel
300                    self.auth_channel.receiver_buffer_pop_front_until_and_including(id);
301                    self.auth_channel.receiver_process_messages(self.state);
302                    self.auth_channel.receiver_drain_messages_into(&mut self.incoming_messages);
303                }
304                EntityMessageType::Despawn => {
305                    if self.state != EntityChannelState::Spawned {
306                        break;
307                    }
308
309                    self.state = EntityChannelState::Despawned;
310                    self.last_epoch_id = Some(id);
311
312                    self.auth_channel.reset();
313                    self.component_channels.clear();
314
315                    self.pop_front_into_outgoing();
316
317                    // clear the buffer
318                    self.buffered_messages.clear();
319                }
320                EntityMessageType::InsertComponent | EntityMessageType::RemoveComponent => {
321
322                    let (id, msg) = self.buffered_messages.pop_front().unwrap();
323                    let component_kind = msg.component_kind().unwrap();
324                    let component_channel = self.component_channels
325                        .entry(component_kind)
326                        .or_insert_with(RemoteComponentChannel::new);
327
328                    component_channel.accept_message(self.state, id, msg);
329                    component_channel.drain_messages_into(&component_kind, &mut self.incoming_messages);
330                }
331                EntityMessageType::Publish | EntityMessageType::Unpublish |
332                EntityMessageType::EnableDelegation | EntityMessageType::DisableDelegation |
333                EntityMessageType::ReleaseAuthority | // NOTE: This should be possible because a client might want to release authority right after enabling delegation
334                EntityMessageType::SetAuthority => {
335                    let (id, msg) = self.buffered_messages.pop_front().unwrap();
336                    // info!("EntityChannelReceiver::process_messages(id={}, msgType={:?})", id, msg.get_type());
337
338                    self.auth_channel.receiver_receive_message(Some(self.state), id, msg);
339                    // Only drain auth messages when entity is Spawned (spawn barrier contract)
340                    if self.state == EntityChannelState::Spawned {
341                        self.auth_channel.receiver_drain_messages_into(&mut self.incoming_messages);
342                    }
343                    // When Despawned, message stays buffered in auth_channel until Spawn arm drains it
344                }
345                EntityMessageType::Noop => {
346                    // Drop it
347                }
348                msg => {
349                    panic!("EntityChannel::accept_message() received an unexpected message type: {:?}", msg);
350                }
351            }
352        }
353    }
354
355    fn pop_front_into_outgoing(&mut self) {
356        let (_, msg) = self.buffered_messages.pop_front().unwrap();
357        self.incoming_messages.push(msg);
358    }
359
360    #[allow(dead_code)] // used in migration unit tests
361    pub(crate) fn get_state(&self) -> EntityChannelState {
362        self.state
363    }
364
365    #[cfg(feature = "e2e_debug")]
366    pub(crate) fn debug_auth_diagnostic(
367        &self,
368    ) -> (
369        EntityChannelState,
370        (SubCommandId, usize, Option<SubCommandId>, usize),
371    ) {
372        let auth_diag = self.auth_channel.receiver_debug_diagnostic();
373        (self.state, auth_diag)
374    }
375
376    #[cfg(feature = "e2e_debug")]
377    pub(crate) fn debug_channel_snapshot(
378        &self,
379    ) -> (
380        EntityChannelState,
381        Option<MessageIndex>,
382        usize,
383        Option<(MessageIndex, EntityMessageType)>,
384        Option<MessageIndex>,
385    ) {
386        let state = self.state;
387        let last_epoch_id = self.last_epoch_id;
388        let buffered_len = self.buffered_messages.len();
389        let head = self
390            .buffered_messages
391            .peek_front()
392            .map(|(id, msg)| (*id, msg.get_type()));
393        let spawn_id = self
394            .buffered_messages
395            .find_by_predicate(|msg| msg.get_type() == EntityMessageType::Spawn)
396            .map(|(id, _)| id);
397        (state, last_epoch_id, buffered_len, head, spawn_id)
398    }
399
400    pub(crate) fn extract_inserted_component_kinds(&self) -> HashSet<ComponentKind> {
401        self.component_channels
402            .iter()
403            .filter(|(_, channel)| channel.is_inserted())
404            .map(|(kind, _)| *kind)
405            .collect()
406    }
407
408    pub(crate) fn force_drain_all_buffers(&mut self) {
409        // Force-drain entity-level buffered messages
410        while let Some((_, msg)) = self.buffered_messages.pop_front() {
411            self.incoming_messages.push(msg);
412        }
413
414        // Force-drain all component channels
415        for (_, component_channel) in self.component_channels.iter_mut() {
416            component_channel.force_drain_buffers(self.state);
417        }
418    }
419
420    pub(crate) fn insert_component(&mut self, component_kind: ComponentKind) {
421        self.component_channels.entry(component_kind).or_insert_with(RemoteComponentChannel::new);
422    }
423
424    pub(crate) fn remove_component(&mut self, component_kind: ComponentKind) {
425        self.component_channels.remove(&component_kind);
426    }
427
428    pub(crate) fn set_spawned(&mut self, epoch_id: MessageIndex) {
429        if self.state != EntityChannelState::Despawned {
430            panic!("Can only set spawned on despawned entity");
431        }
432        self.state = EntityChannelState::Spawned;
433        self.last_epoch_id = Some(epoch_id);
434    }
435
436    pub(crate) fn insert_component_channel_as_inserted(
437        &mut self,
438        component_kind: ComponentKind,
439        epoch_id: MessageIndex,
440    ) {
441        let mut comp_channel = RemoteComponentChannel::new();
442        comp_channel.set_inserted(true, epoch_id);
443        self.component_channels.insert(component_kind, comp_channel);
444    }
445
446    #[allow(dead_code)] // used in bulletproof migration unit tests
447    pub(crate) fn take_incoming_events(&mut self) -> Vec<EntityMessage<()>> {
448        std::mem::take(&mut self.incoming_messages)
449    }
450}
451
452use crate::world::sync::auth_channel::AuthChannel;
453use crate::world::sync::ordered_ids::OrderedIds;
454