naia_shared/world/entity/
entity_action_receiver.rs1use std::{
2 collections::{HashMap, VecDeque},
3 hash::Hash,
4 marker::PhantomData,
5};
6
7use crate::{
8 messages::channels::receivers::reliable_receiver::ReliableReceiver, sequence_less_than,
9 world::component::component_kinds::ComponentKind, EntityAction, MessageIndex as ActionIndex,
10};
11
12pub struct EntityActionReceiver<E: Copy + Hash + Eq> {
13 receiver: ReliableReceiver<EntityAction<E>>,
14 entity_channels: HashMap<E, EntityChannel<E>>,
15}
16
17impl<E: Copy + Hash + Eq> EntityActionReceiver<E> {
18 pub fn new() -> Self {
19 Self {
20 receiver: ReliableReceiver::new(),
21 entity_channels: HashMap::default(),
22 }
23 }
24
25 pub fn track_hosts_redundant_remote_entity(
26 &mut self,
27 entity: &E,
28 component_kinds: &Vec<ComponentKind>,
29 ) {
30 let mut entity_channel = EntityChannel::new(*entity);
31 entity_channel.spawned = true;
32 for component_kind in component_kinds {
33 entity_channel
34 .components
35 .insert(*component_kind, ComponentChannel::new(None));
36 }
37 self.entity_channels.insert(*entity, entity_channel);
38 }
39
40 pub fn untrack_hosts_redundant_remote_entity(&mut self, entity: &E) {
41 self.entity_channels.remove(entity);
42 }
43
44 pub fn buffer_action(&mut self, action_index: ActionIndex, action: EntityAction<E>) {
46 self.receiver.buffer_message(action_index, action);
47 }
48
49 pub fn receive_actions(&mut self) -> Vec<EntityAction<E>> {
54 let mut outgoing_actions = Vec::new();
55 let incoming_actions = self.receiver.receive_messages();
56 for (action_index, action) in incoming_actions {
57 if let Some(entity) = action.entity() {
58 self.entity_channels
59 .entry(entity)
60 .or_insert_with(|| EntityChannel::new(entity));
61 let entity_channel = self.entity_channels.get_mut(&entity).unwrap();
62 entity_channel.receive_action(action_index, action, &mut outgoing_actions);
63 }
64 }
65
66 outgoing_actions
72 }
73}
74
75struct EntityChannel<E: Copy + Hash + Eq> {
77 entity: E,
78 last_canonical_index: Option<ActionIndex>,
79 spawned: bool,
80 components: HashMap<ComponentKind, ComponentChannel<E>>,
81 waiting_spawns: OrderedIds<Vec<ComponentKind>>,
82 waiting_despawns: OrderedIds<()>,
83}
84
85impl<E: Copy + Hash + Eq> EntityChannel<E> {
86 pub fn new(entity: E) -> Self {
87 Self {
88 entity,
89 spawned: false,
90 components: HashMap::new(),
91 waiting_spawns: OrderedIds::new(),
92 waiting_despawns: OrderedIds::new(),
93 last_canonical_index: None,
94 }
95 }
96
97 pub fn receive_action(
107 &mut self,
108 incoming_action_index: ActionIndex,
109 incoming_action: EntityAction<E>,
110 outgoing_actions: &mut Vec<EntityAction<E>>,
111 ) {
112 match incoming_action {
113 EntityAction::SpawnEntity(_, components) => {
114 self.receive_spawn_entity_action(
115 incoming_action_index,
116 components,
117 outgoing_actions,
118 );
119 }
120 EntityAction::DespawnEntity(_) => {
121 self.receive_despawn_entity_action(incoming_action_index, outgoing_actions);
122 }
123 EntityAction::InsertComponent(_, component) => {
124 self.receive_insert_component_action(
125 incoming_action_index,
126 component,
127 outgoing_actions,
128 );
129 }
130 EntityAction::RemoveComponent(_, component) => {
131 self.receive_remove_component_action(
132 incoming_action_index,
133 component,
134 outgoing_actions,
135 );
136 }
137 EntityAction::Noop => {}
138 }
139 }
140
141 pub fn receive_spawn_entity_action(
145 &mut self,
146 action_index: ActionIndex,
147 components: Vec<ComponentKind>,
148 outgoing_actions: &mut Vec<EntityAction<E>>,
149 ) {
150 if let Some(last_index) = self.last_canonical_index {
158 if sequence_less_than(action_index, last_index) {
159 return;
160 }
161 }
162
163 if !self.spawned {
164 self.spawned = true;
165 outgoing_actions.push(EntityAction::SpawnEntity(self.entity, components));
166
167 self.receive_canonical(action_index);
169
170 if let Some((despawn_index, _)) = self.waiting_despawns.inner.pop_front() {
172 self.receive_despawn_entity_action(despawn_index, outgoing_actions);
173 } else {
174 let mut inserted_components = Vec::new();
176 for (component, component_state) in &mut self.components {
177 if let Some(insert_index) = component_state.waiting_inserts.inner.pop_front() {
178 inserted_components.push((insert_index, *component));
179 }
180 }
181
182 for ((index, _), component) in inserted_components {
183 self.receive_insert_component_action(index, component, outgoing_actions);
184 }
185 }
186 } else {
187 self.waiting_spawns.push_back(action_index, components);
189 }
190 }
191
192 pub fn receive_despawn_entity_action(
196 &mut self,
197 index: ActionIndex,
198 outgoing_actions: &mut Vec<EntityAction<E>>,
199 ) {
200 if let Some(last_index) = self.last_canonical_index {
202 if sequence_less_than(index, last_index) {
203 return;
204 }
205 }
206
207 if self.spawned {
208 self.spawned = false;
209 outgoing_actions.push(EntityAction::DespawnEntity(self.entity));
210
211 self.receive_canonical(index);
213
214 for value in self.components.values_mut() {
216 value.inserted = false;
217 }
218
219 if let Some((spawn_index, components)) = self.waiting_spawns.inner.pop_front() {
221 self.receive_spawn_entity_action(spawn_index, components, outgoing_actions);
222 }
223 } else {
224 self.waiting_despawns.push_back(index, ());
226 }
227 }
228
229 pub fn receive_insert_component_action(
230 &mut self,
231 index: ActionIndex,
232 component: ComponentKind,
233 outgoing_actions: &mut Vec<EntityAction<E>>,
234 ) {
235 if let Some(last_index) = self.last_canonical_index {
237 if sequence_less_than(index, last_index) {
238 return;
239 }
240 }
241
242 if let std::collections::hash_map::Entry::Vacant(e) = self.components.entry(component) {
243 e.insert(ComponentChannel::new(self.last_canonical_index));
244 }
245 let component_state = self.components.get_mut(&component).unwrap();
246
247 if let Some(last_index) = component_state.last_canonical_index {
250 if sequence_less_than(index, last_index) {
251 return;
252 }
253 }
254
255 if !component_state.inserted {
256 component_state.inserted = true;
257 outgoing_actions.push(EntityAction::InsertComponent(self.entity, component));
258
259 component_state.receive_canonical(index);
262
263 if let Some((remove_index, _)) = component_state.waiting_removes.inner.pop_front() {
265 self.receive_remove_component_action(remove_index, component, outgoing_actions);
266 }
267 } else {
268 component_state.waiting_inserts.push_back(index, ());
270 }
271 }
272
273 pub fn receive_remove_component_action(
274 &mut self,
275 index: ActionIndex,
276 component: ComponentKind,
277 outgoing_actions: &mut Vec<EntityAction<E>>,
278 ) {
279 if let Some(last_index) = self.last_canonical_index {
281 if sequence_less_than(index, last_index) {
282 return;
283 }
284 }
285
286 if let std::collections::hash_map::Entry::Vacant(e) = self.components.entry(component) {
287 e.insert(ComponentChannel::new(self.last_canonical_index));
288 }
289 let component_state = self.components.get_mut(&component).unwrap();
290
291 if let Some(last_index) = component_state.last_canonical_index {
294 if sequence_less_than(index, last_index) {
295 return;
296 }
297 }
298
299 if component_state.inserted {
300 component_state.inserted = false;
301 outgoing_actions.push(EntityAction::RemoveComponent(self.entity, component));
302
303 component_state.receive_canonical(index);
306
307 if let Some((insert_index, _)) = component_state.waiting_inserts.inner.pop_front() {
309 self.receive_insert_component_action(insert_index, component, outgoing_actions);
310 }
311 } else {
312 component_state.waiting_removes.push_back(index, ());
314 }
315 }
316
317 pub fn receive_canonical(&mut self, index: ActionIndex) {
318 self.waiting_spawns.pop_front_until_and_including(index);
320 self.waiting_despawns.pop_front_until_and_including(index);
321 for component_state in self.components.values_mut() {
322 component_state.receive_canonical(index);
323 }
324
325 self.last_canonical_index = Some(index);
326 }
327}
328
329pub struct ComponentChannel<E: Copy + Hash + Eq> {
333 pub inserted: bool,
334 pub last_canonical_index: Option<ActionIndex>,
335 pub waiting_inserts: OrderedIds<()>,
336 pub waiting_removes: OrderedIds<()>,
337
338 phantom_e: PhantomData<E>,
339}
340
341impl<E: Copy + Hash + Eq> ComponentChannel<E> {
342 pub fn new(canonical_index: Option<ActionIndex>) -> Self {
343 Self {
344 inserted: false,
345 waiting_inserts: OrderedIds::new(),
346 waiting_removes: OrderedIds::new(),
347 last_canonical_index: canonical_index,
348
349 phantom_e: PhantomData,
350 }
351 }
352
353 pub fn receive_canonical(&mut self, index: ActionIndex) {
354 self.waiting_inserts.pop_front_until_and_including(index);
356 self.waiting_removes.pop_front_until_and_including(index);
357
358 self.last_canonical_index = Some(index);
359 }
360}
361
362pub struct OrderedIds<P> {
363 inner: VecDeque<(ActionIndex, P)>,
365}
366
367impl<P> OrderedIds<P> {
368 pub fn new() -> Self {
369 Self {
370 inner: VecDeque::new(),
371 }
372 }
373
374 pub fn push_back(&mut self, action_index: ActionIndex, item: P) {
394 let mut current_index = self.inner.len();
395
396 loop {
397 if current_index == 0 {
398 self.inner.push_front((action_index, item));
399 return;
400 }
401
402 current_index -= 1;
403
404 let (old_index, _) = self.inner.get(current_index).unwrap();
405 if sequence_less_than(*old_index, action_index) {
406 self.inner.insert(current_index + 1, (action_index, item));
407 return;
408 }
409 }
410 }
411
412 pub fn pop_front_until_and_including(&mut self, index: ActionIndex) {
413 let mut pop = false;
414
415 if let Some((old_index, _)) = self.inner.front() {
416 if *old_index == index || sequence_less_than(*old_index, index) {
417 pop = true;
418 }
419 }
420
421 if pop {
422 self.inner.pop_front();
423 }
424 }
425}