1use std::{
78 collections::{HashMap, HashSet},
79 hash::Hash,
80};
81
82use crate::{
83 sequence_less_than, world::sync::remote_component_channel::RemoteComponentChannel,
84 ComponentKind, EntityAuthStatus, EntityCommand, EntityMessage, EntityMessageType, HostType,
85 MessageIndex,
86};
87
88cfg_if! {
89 if #[cfg(feature = "e2e_debug")] {
90 use crate::world::host::host_world_manager::SubCommandId;
91 }
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96#[cfg_attr(feature = "e2e_debug", allow(dead_code))]
97pub enum EntityChannelState {
98 Despawned,
100 Spawned,
102}
103
104pub struct RemoteEntityChannel {
106 state: EntityChannelState,
107 last_epoch_id: Option<MessageIndex>,
108
109 component_channels: HashMap<ComponentKind, RemoteComponentChannel>,
110 auth_channel: AuthChannel,
111
112 buffered_messages: OrderedIds<EntityMessage<()>>,
113 incoming_messages: Vec<EntityMessage<()>>,
114 outgoing_commands: Vec<EntityCommand>,
115}
116
117impl RemoteEntityChannel {
118 pub fn new(host_type: HostType) -> Self {
120 Self {
121 state: EntityChannelState::Despawned,
122 last_epoch_id: None,
123
124 component_channels: HashMap::new(),
125 auth_channel: AuthChannel::new(host_type),
126
127 buffered_messages: OrderedIds::new(),
128 incoming_messages: Vec::new(),
129 outgoing_commands: Vec::new(),
130 }
131 }
132
133 pub fn new_delegated(host_type: HostType) -> Self {
138 let mut channel = Self::new(host_type);
139 channel.configure_as_delegated();
140 channel
141 }
142
143 pub fn configure_as_delegated(&mut self) {
145 self.auth_channel.force_publish();
148 self.auth_channel.force_enable_delegation();
149 self.auth_channel.receiver_set_next_subcommand_id(1);
151 }
152
153 pub fn update_auth_status(&mut self, auth_status: EntityAuthStatus) {
155 self.auth_channel.force_set_auth_status(auth_status);
156 }
157
158 pub fn auth_status(&self) -> Option<EntityAuthStatus> {
160 self.auth_channel.auth_status()
161 }
162
163 pub fn is_delegated(&self) -> bool {
165 self.auth_channel.is_delegated()
166 }
167
168 pub(crate) fn receive_message(&mut self, id: MessageIndex, msg: EntityMessage<()>) {
169 if let Some(last_epoch_id) = self.last_epoch_id {
170 if last_epoch_id == id {
171 panic!("EntityChannel received a message with the same id as the last epoch id. This should not happen. Message: {:?}", msg);
172 }
173
174 if sequence_less_than(id, last_epoch_id) {
175 return;
177 }
178 }
179
180 self.buffered_messages.push_back(id, msg);
181
182 self.process_messages();
183 }
184
185 pub fn send_command(&mut self, command: EntityCommand) {
187 self.auth_channel.send_command(command);
188 self.auth_channel
189 .sender_drain_messages_into(&mut self.outgoing_commands);
190 }
191
192 pub(crate) fn drain_incoming_messages_into<E: Copy + Hash + Eq>(
193 &mut self,
194 entity: E,
195 outgoing_events: &mut Vec<EntityMessage<E>>,
196 ) {
197 let mut received_messages = Vec::new();
199 for rmsg in std::mem::take(&mut self.incoming_messages) {
200 received_messages.push(rmsg.with_entity(entity));
201 }
202 outgoing_events.append(&mut received_messages);
203 }
204
205 pub(crate) fn drain_outgoing_messages_into(
206 &mut self,
207 outgoing_commands: &mut Vec<EntityCommand>,
208 ) {
209 outgoing_commands.append(&mut self.outgoing_commands);
210 }
211
212 pub(crate) fn has_component_kind(&self, component_kind: &ComponentKind) -> bool {
213 self.component_channels.contains_key(component_kind)
214 }
215
216 fn process_messages(&mut self) {
217 loop {
218 let Some((id, msg)) = self.buffered_messages.peek_front() else {
219 break;
220 };
221 let id = *id;
222
223 match msg.get_type() {
224 EntityMessageType::Spawn => {
225 if self.state != EntityChannelState::Despawned {
226 break;
227 }
228
229 self.state = EntityChannelState::Spawned;
230 #[cfg(feature = "e2e_debug")]
232 {
233 extern "Rust" {
234 fn client_processed_spawn_increment();
235 }
236 unsafe {
239 client_processed_spawn_increment();
240 }
241 }
242 self.last_epoch_id = Some(id);
243 self.buffered_messages.pop_front_until_and_excluding(id);
245
246 self.pop_front_into_outgoing();
247
248 self.auth_channel.receiver_buffer_pop_front_until_and_including(id);
250
251 self.auth_channel.receiver_process_messages(self.state);
252 self.auth_channel.receiver_drain_messages_into(&mut self.incoming_messages);
253
254 for (component_kind, component_channel) in self.component_channels.iter_mut() {
258 component_channel.buffer_pop_front_until_and_excluding(id);
259 component_channel.process_messages(self.state);
260 component_channel.drain_messages_into(component_kind, &mut self.incoming_messages);
261 }
262 }
263 EntityMessageType::SpawnWithComponents => {
264 if self.state != EntityChannelState::Despawned {
265 break;
266 }
267
268 self.state = EntityChannelState::Spawned;
269 self.last_epoch_id = Some(id);
270 self.buffered_messages.pop_front_until_and_excluding(id);
272
273 let (_, msg) = self.buffered_messages.pop_front().unwrap();
275 let kinds = match msg {
276 EntityMessage::SpawnWithComponents((), kinds) => kinds,
277 _ => unreachable!(),
278 };
279
280 self.incoming_messages.push(EntityMessage::Spawn(()));
282
283 for (component_kind, component_channel) in self.component_channels.iter_mut() {
285 component_channel.buffer_pop_front_until_and_excluding(id);
286 component_channel.process_messages(self.state);
287 component_channel.drain_messages_into(component_kind, &mut self.incoming_messages);
288 }
289
290 for kind in &kinds {
292 let component_channel = self.component_channels
293 .entry(*kind)
294 .or_insert_with(RemoteComponentChannel::new);
295 component_channel.set_inserted(true, id);
296 self.incoming_messages.push(EntityMessage::InsertComponent((), *kind));
297 }
298
299 self.auth_channel.receiver_buffer_pop_front_until_and_including(id);
301 self.auth_channel.receiver_process_messages(self.state);
302 self.auth_channel.receiver_drain_messages_into(&mut self.incoming_messages);
303 }
304 EntityMessageType::Despawn => {
305 if self.state != EntityChannelState::Spawned {
306 break;
307 }
308
309 self.state = EntityChannelState::Despawned;
310 self.last_epoch_id = Some(id);
311
312 self.auth_channel.reset();
313 self.component_channels.clear();
314
315 self.pop_front_into_outgoing();
316
317 self.buffered_messages.clear();
319 }
320 EntityMessageType::InsertComponent | EntityMessageType::RemoveComponent => {
321
322 let (id, msg) = self.buffered_messages.pop_front().unwrap();
323 let component_kind = msg.component_kind().unwrap();
324 let component_channel = self.component_channels
325 .entry(component_kind)
326 .or_insert_with(RemoteComponentChannel::new);
327
328 component_channel.accept_message(self.state, id, msg);
329 component_channel.drain_messages_into(&component_kind, &mut self.incoming_messages);
330 }
331 EntityMessageType::Publish | EntityMessageType::Unpublish |
332 EntityMessageType::EnableDelegation | EntityMessageType::DisableDelegation |
333 EntityMessageType::ReleaseAuthority | EntityMessageType::SetAuthority => {
335 let (id, msg) = self.buffered_messages.pop_front().unwrap();
336 self.auth_channel.receiver_receive_message(Some(self.state), id, msg);
339 if self.state == EntityChannelState::Spawned {
341 self.auth_channel.receiver_drain_messages_into(&mut self.incoming_messages);
342 }
343 }
345 EntityMessageType::Noop => {
346 }
348 msg => {
349 panic!("EntityChannel::accept_message() received an unexpected message type: {:?}", msg);
350 }
351 }
352 }
353 }
354
355 fn pop_front_into_outgoing(&mut self) {
356 let (_, msg) = self.buffered_messages.pop_front().unwrap();
357 self.incoming_messages.push(msg);
358 }
359
360 #[allow(dead_code)] pub(crate) fn get_state(&self) -> EntityChannelState {
362 self.state
363 }
364
365 #[cfg(feature = "e2e_debug")]
366 pub(crate) fn debug_auth_diagnostic(
367 &self,
368 ) -> (
369 EntityChannelState,
370 (SubCommandId, usize, Option<SubCommandId>, usize),
371 ) {
372 let auth_diag = self.auth_channel.receiver_debug_diagnostic();
373 (self.state, auth_diag)
374 }
375
376 #[cfg(feature = "e2e_debug")]
377 pub(crate) fn debug_channel_snapshot(
378 &self,
379 ) -> (
380 EntityChannelState,
381 Option<MessageIndex>,
382 usize,
383 Option<(MessageIndex, EntityMessageType)>,
384 Option<MessageIndex>,
385 ) {
386 let state = self.state;
387 let last_epoch_id = self.last_epoch_id;
388 let buffered_len = self.buffered_messages.len();
389 let head = self
390 .buffered_messages
391 .peek_front()
392 .map(|(id, msg)| (*id, msg.get_type()));
393 let spawn_id = self
394 .buffered_messages
395 .find_by_predicate(|msg| msg.get_type() == EntityMessageType::Spawn)
396 .map(|(id, _)| id);
397 (state, last_epoch_id, buffered_len, head, spawn_id)
398 }
399
400 pub(crate) fn extract_inserted_component_kinds(&self) -> HashSet<ComponentKind> {
401 self.component_channels
402 .iter()
403 .filter(|(_, channel)| channel.is_inserted())
404 .map(|(kind, _)| *kind)
405 .collect()
406 }
407
408 pub(crate) fn force_drain_all_buffers(&mut self) {
409 while let Some((_, msg)) = self.buffered_messages.pop_front() {
411 self.incoming_messages.push(msg);
412 }
413
414 for (_, component_channel) in self.component_channels.iter_mut() {
416 component_channel.force_drain_buffers(self.state);
417 }
418 }
419
420 pub(crate) fn insert_component(&mut self, component_kind: ComponentKind) {
421 self.component_channels.entry(component_kind).or_insert_with(RemoteComponentChannel::new);
422 }
423
424 pub(crate) fn remove_component(&mut self, component_kind: ComponentKind) {
425 self.component_channels.remove(&component_kind);
426 }
427
428 pub(crate) fn set_spawned(&mut self, epoch_id: MessageIndex) {
429 if self.state != EntityChannelState::Despawned {
430 panic!("Can only set spawned on despawned entity");
431 }
432 self.state = EntityChannelState::Spawned;
433 self.last_epoch_id = Some(epoch_id);
434 }
435
436 pub(crate) fn insert_component_channel_as_inserted(
437 &mut self,
438 component_kind: ComponentKind,
439 epoch_id: MessageIndex,
440 ) {
441 let mut comp_channel = RemoteComponentChannel::new();
442 comp_channel.set_inserted(true, epoch_id);
443 self.component_channels.insert(component_kind, comp_channel);
444 }
445
446 #[allow(dead_code)] pub(crate) fn take_incoming_events(&mut self) -> Vec<EntityMessage<()>> {
448 std::mem::take(&mut self.incoming_messages)
449 }
450}
451
452use crate::world::sync::auth_channel::AuthChannel;
453use crate::world::sync::ordered_ids::OrderedIds;
454