Skip to main content

naia_shared/world/sync/
host_entity_channel.rs

1use std::collections::HashSet;
2
3use crate::{
4    world::sync::{auth_channel::AuthChannel, ordered_ids::OrderedIds},
5    ComponentKind, EntityCommand, EntityMessage, EntityMessageType, HostEntity, HostType,
6    MessageIndex,
7};
8
9/// Outbound state machine for a single host-owned entity, tracking its component set and authority sub-channel.
10pub struct HostEntityChannel {
11    component_channels: HashSet<ComponentKind>,
12    auth_channel: AuthChannel,
13
14    buffered_messages: OrderedIds<EntityMessage<()>>,
15    incoming_messages: Vec<EntityMessage<()>>,
16    outgoing_commands: Vec<EntityCommand>,
17}
18
19impl HostEntityChannel {
20    /// Creates a fresh `HostEntityChannel` with no components and default auth state for `host_type`.
21    pub fn new(host_type: HostType) -> Self {
22        Self {
23            component_channels: HashSet::new(),
24            auth_channel: AuthChannel::new(host_type),
25
26            buffered_messages: OrderedIds::new(),
27            incoming_messages: Vec::new(),
28            outgoing_commands: Vec::new(),
29        }
30    }
31
32    pub(crate) fn component_kinds(&self) -> &HashSet<ComponentKind> {
33        &self.component_channels
34    }
35
36    /// Validates and routes `command` to the component set or authority sub-channel, queuing it for outbound delivery.
37    pub fn send_command(&mut self, command: EntityCommand) {
38        match command.get_type() {
39            EntityMessageType::Spawn
40            | EntityMessageType::SpawnWithComponents
41            | EntityMessageType::Despawn
42            | EntityMessageType::Noop => {
43                panic!("These should be handled by the Engine, not the EntityChannelSender");
44            }
45            EntityMessageType::InsertComponent => {
46                let component_kind = command.component_kind().unwrap();
47                if self.component_channels.contains(&component_kind) {
48                    panic!("Cannot insert a component that already exists in the entity channel");
49                }
50                self.component_channels.insert(component_kind);
51                self.outgoing_commands.push(command);
52            }
53            EntityMessageType::RemoveComponent => {
54                let component_kind = command.component_kind().unwrap();
55                if !self.component_channels.contains(&component_kind) {
56                    panic!("Cannot remove a component that does not exist in the entity channel");
57                }
58                self.component_channels.remove(&component_kind);
59                self.outgoing_commands.push(command);
60            }
61            EntityMessageType::Publish
62            | EntityMessageType::Unpublish
63            | EntityMessageType::EnableDelegation
64            | EntityMessageType::DisableDelegation
65            | EntityMessageType::SetAuthority
66            | EntityMessageType::RequestAuthority
67            | EntityMessageType::ReleaseAuthority
68            | EntityMessageType::EnableDelegationResponse
69            | EntityMessageType::MigrateResponse => {
70                self.auth_channel.validate_command(&command);
71                self.auth_channel.send_command(command);
72                self.auth_channel
73                    .sender_drain_messages_into(&mut self.outgoing_commands);
74            }
75        }
76    }
77
78    pub(crate) fn drain_incoming_messages_into(
79        &mut self,
80        entity: HostEntity,
81        outgoing_events: &mut Vec<EntityMessage<HostEntity>>,
82    ) {
83        // Drain the entity channel and append the messages to the outgoing events
84        let mut received_messages = Vec::new();
85        for rmsg in std::mem::take(&mut self.incoming_messages) {
86            // info!("EntityChannelSender::drain_incoming_messages_into(entity={:?}, msgType={:?})", entity, rmsg.get_type());
87
88            received_messages.push(rmsg.with_entity(entity));
89        }
90        outgoing_events.append(&mut received_messages);
91    }
92
93    pub(crate) fn drain_outgoing_messages_into(
94        &mut self,
95        outgoing_commands: &mut Vec<EntityCommand>,
96    ) {
97        outgoing_commands.append(&mut self.outgoing_commands);
98    }
99
100    pub(crate) fn receive_message(&mut self, id: MessageIndex, msg: EntityMessage<()>) {
101        self.buffered_messages.push_back(id, msg);
102        self.process_messages();
103    }
104
105    fn process_messages(&mut self) {
106        loop {
107            let Some((_id, msg)) = self.buffered_messages.peek_front() else {
108                break;
109            };
110
111            match msg.get_type() {
112                EntityMessageType::RequestAuthority
113                | EntityMessageType::ReleaseAuthority
114                | EntityMessageType::EnableDelegationResponse
115                | EntityMessageType::MigrateResponse => {
116                    let (id, msg) = self.buffered_messages.pop_front().unwrap();
117
118                    // info!("EntityChannelSender::process_messages(id={}, msgType={:?})", id, msg.get_type());
119
120                    self.auth_channel.receiver_receive_message(None, id, msg);
121                    self.auth_channel
122                        .receiver_drain_messages_into(&mut self.incoming_messages);
123                }
124                EntityMessageType::Noop => {
125                    // Drop it
126                }
127                msg => {
128                    panic!("EntityChannelSender::process_messages() received an unexpected message type: {:?}", msg);
129                }
130            }
131        }
132    }
133
134    pub(crate) fn new_with_components(
135        host_type: HostType,
136        component_kinds: HashSet<ComponentKind>,
137    ) -> Self {
138        Self {
139            component_channels: component_kinds,
140            auth_channel: AuthChannel::new(host_type),
141            buffered_messages: OrderedIds::new(),
142            incoming_messages: Vec::new(),
143            outgoing_commands: Vec::new(),
144        }
145    }
146
147    /// Drains and returns all queued outbound [`EntityCommand`]s.
148    pub fn extract_outgoing_commands(&mut self) -> Vec<EntityCommand> {
149        std::mem::take(&mut self.outgoing_commands)
150    }
151
152    /// Force-enable delegation on this channel (client-side only)
153    /// This is called when the client originates an EnableDelegation message,
154    /// to ensure the local channel is in the correct state to receive MigrateResponse
155    pub fn local_enable_delegation(&mut self) {
156        // Must publish first before enabling delegation
157        self.auth_channel.force_publish();
158        self.auth_channel.force_enable_delegation();
159    }
160
161    /// Returns `true` if this channel's authority sub-channel is in the Delegated state.
162    pub fn is_delegated(&self) -> bool {
163        self.auth_channel.is_delegated()
164    }
165
166    /// Returns the current publication/delegation state of this channel's authority sub-channel.
167    pub fn auth_channel_state(&self) -> crate::world::sync::auth_channel::EntityAuthChannelState {
168        self.auth_channel.state()
169    }
170}