naia_shared/world/entity/
entity_action_receiver.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    hash::Hash,
4    marker::PhantomData,
5};
6
7use crate::{
8    messages::channels::receivers::reliable_receiver::ReliableReceiver, sequence_less_than,
9    world::component::component_kinds::ComponentKind, EntityAction, MessageIndex as ActionIndex,
10};
11
12pub struct EntityActionReceiver<E: Copy + Hash + Eq> {
13    receiver: ReliableReceiver<EntityAction<E>>,
14    entity_channels: HashMap<E, EntityChannel<E>>,
15}
16
17impl<E: Copy + Hash + Eq> EntityActionReceiver<E> {
18    pub fn new() -> Self {
19        Self {
20            receiver: ReliableReceiver::new(),
21            entity_channels: HashMap::default(),
22        }
23    }
24
25    pub fn track_hosts_redundant_remote_entity(
26        &mut self,
27        entity: &E,
28        component_kinds: &Vec<ComponentKind>,
29    ) {
30        let mut entity_channel = EntityChannel::new(*entity);
31        entity_channel.spawned = true;
32        for component_kind in component_kinds {
33            entity_channel
34                .components
35                .insert(*component_kind, ComponentChannel::new(None));
36        }
37        self.entity_channels.insert(*entity, entity_channel);
38    }
39
40    pub fn untrack_hosts_redundant_remote_entity(&mut self, entity: &E) {
41        self.entity_channels.remove(entity);
42    }
43
44    /// Buffer a read [`EntityAction`] so that it can be processed later
45    pub fn buffer_action(&mut self, action_index: ActionIndex, action: EntityAction<E>) {
46        self.receiver.buffer_message(action_index, action);
47    }
48
49    /// Read all buffered [`EntityAction`] inside the `receiver` and process them.
50    ///
51    /// Outputs the list of [`EntityAction`] that can be executed now, buffer the rest
52    /// into each entity's [`EntityChannel`]
53    pub fn receive_actions(&mut self) -> Vec<EntityAction<E>> {
54        let mut outgoing_actions = Vec::new();
55        let incoming_actions = self.receiver.receive_messages();
56        for (action_index, action) in incoming_actions {
57            if let Some(entity) = action.entity() {
58                self.entity_channels
59                    .entry(entity)
60                    .or_insert_with(|| EntityChannel::new(entity));
61                let entity_channel = self.entity_channels.get_mut(&entity).unwrap();
62                entity_channel.receive_action(action_index, action, &mut outgoing_actions);
63            }
64        }
65
66        // TODO: VERY IMPORTANT! You need to figure out how to remove EntityChannels after they've been despawned!
67        // keep in mind that you need to keep around entity channels to be able to receive messages for them still
68        // RIGHT NOW THIS IS LEAKING MEMORY!
69        // a TTL for these Entity Channels after they've been despawned is probably the way to go
70
71        outgoing_actions
72    }
73}
74
75// Entity Channel
76struct EntityChannel<E: Copy + Hash + Eq> {
77    entity: E,
78    last_canonical_index: Option<ActionIndex>,
79    spawned: bool,
80    components: HashMap<ComponentKind, ComponentChannel<E>>,
81    waiting_spawns: OrderedIds<Vec<ComponentKind>>,
82    waiting_despawns: OrderedIds<()>,
83}
84
85impl<E: Copy + Hash + Eq> EntityChannel<E> {
86    pub fn new(entity: E) -> Self {
87        Self {
88            entity,
89            spawned: false,
90            components: HashMap::new(),
91            waiting_spawns: OrderedIds::new(),
92            waiting_despawns: OrderedIds::new(),
93            last_canonical_index: None,
94        }
95    }
96
97    /// Process the provided [`EntityAction`]:
98    ///
99    /// * Checks that [`EntityAction`] can be executed now
100    /// * If so, add it to `outgoing_actions`
101    /// * Else, add it to internal "waiting" buffers so we can check when the [`EntityAction`]
102    ///   can be executed
103    ///
104    /// ([`EntityAction`]s might not be executable now, for example is an InsertComponent
105    ///  is processed before the corresponding entity has been spawned)
106    pub fn receive_action(
107        &mut self,
108        incoming_action_index: ActionIndex,
109        incoming_action: EntityAction<E>,
110        outgoing_actions: &mut Vec<EntityAction<E>>,
111    ) {
112        match incoming_action {
113            EntityAction::SpawnEntity(_, components) => {
114                self.receive_spawn_entity_action(
115                    incoming_action_index,
116                    components,
117                    outgoing_actions,
118                );
119            }
120            EntityAction::DespawnEntity(_) => {
121                self.receive_despawn_entity_action(incoming_action_index, outgoing_actions);
122            }
123            EntityAction::InsertComponent(_, component) => {
124                self.receive_insert_component_action(
125                    incoming_action_index,
126                    component,
127                    outgoing_actions,
128                );
129            }
130            EntityAction::RemoveComponent(_, component) => {
131                self.receive_remove_component_action(
132                    incoming_action_index,
133                    component,
134                    outgoing_actions,
135                );
136            }
137            EntityAction::Noop => {}
138        }
139    }
140
141    /// Process the entity action.
142    /// When the entity is actually spawned on the client, send back an ack event
143    /// to the server.
144    pub fn receive_spawn_entity_action(
145        &mut self,
146        action_index: ActionIndex,
147        components: Vec<ComponentKind>,
148        outgoing_actions: &mut Vec<EntityAction<E>>,
149    ) {
150        // this is the problem:
151        // the point of the receiver is to de-dup a given event, like a Spawn Action here
152        // we only only convert the NEWEST spawn packet into a SpawnAction
153        // so the problem we're running into is that: Two Spawn Packets are sent, 1 with components A, B, and 1 with components A, B, C
154        // action_index will be the same for both, however ...
155
156        // do not process any spawn OLDER than last received spawn index / despawn index
157        if let Some(last_index) = self.last_canonical_index {
158            if sequence_less_than(action_index, last_index) {
159                return;
160            }
161        }
162
163        if !self.spawned {
164            self.spawned = true;
165            outgoing_actions.push(EntityAction::SpawnEntity(self.entity, components));
166
167            // pop ALL waiting spawns, despawns, inserts, and removes OLDER than spawn_index
168            self.receive_canonical(action_index);
169
170            // process any waiting despawns
171            if let Some((despawn_index, _)) = self.waiting_despawns.inner.pop_front() {
172                self.receive_despawn_entity_action(despawn_index, outgoing_actions);
173            } else {
174                // process any waiting inserts
175                let mut inserted_components = Vec::new();
176                for (component, component_state) in &mut self.components {
177                    if let Some(insert_index) = component_state.waiting_inserts.inner.pop_front() {
178                        inserted_components.push((insert_index, *component));
179                    }
180                }
181
182                for ((index, _), component) in inserted_components {
183                    self.receive_insert_component_action(index, component, outgoing_actions);
184                }
185            }
186        } else {
187            // buffer spawn for later
188            self.waiting_spawns.push_back(action_index, components);
189        }
190    }
191
192    /// Process the entity despawn action
193    /// When the entity has actually been despawned on the client, add an ack to the
194    /// `outgoing_actions`
195    pub fn receive_despawn_entity_action(
196        &mut self,
197        index: ActionIndex,
198        outgoing_actions: &mut Vec<EntityAction<E>>,
199    ) {
200        // do not process any despawn OLDER than last received spawn index / despawn index
201        if let Some(last_index) = self.last_canonical_index {
202            if sequence_less_than(index, last_index) {
203                return;
204            }
205        }
206
207        if self.spawned {
208            self.spawned = false;
209            outgoing_actions.push(EntityAction::DespawnEntity(self.entity));
210
211            // pop ALL waiting spawns, despawns, inserts, and removes OLDER than despawn_index
212            self.receive_canonical(index);
213
214            // set all component channels to 'inserted = false'
215            for value in self.components.values_mut() {
216                value.inserted = false;
217            }
218
219            // process any waiting spawns
220            if let Some((spawn_index, components)) = self.waiting_spawns.inner.pop_front() {
221                self.receive_spawn_entity_action(spawn_index, components, outgoing_actions);
222            }
223        } else {
224            // buffer despawn for later
225            self.waiting_despawns.push_back(index, ());
226        }
227    }
228
229    pub fn receive_insert_component_action(
230        &mut self,
231        index: ActionIndex,
232        component: ComponentKind,
233        outgoing_actions: &mut Vec<EntityAction<E>>,
234    ) {
235        // do not process any insert OLDER than last received spawn index / despawn index
236        if let Some(last_index) = self.last_canonical_index {
237            if sequence_less_than(index, last_index) {
238                return;
239            }
240        }
241
242        if let std::collections::hash_map::Entry::Vacant(e) = self.components.entry(component) {
243            e.insert(ComponentChannel::new(self.last_canonical_index));
244        }
245        let component_state = self.components.get_mut(&component).unwrap();
246
247        // do not process any insert OLDER than last received insert / remove index for
248        // this component
249        if let Some(last_index) = component_state.last_canonical_index {
250            if sequence_less_than(index, last_index) {
251                return;
252            }
253        }
254
255        if !component_state.inserted {
256            component_state.inserted = true;
257            outgoing_actions.push(EntityAction::InsertComponent(self.entity, component));
258
259            // pop ALL waiting inserts, and removes OLDER than insert_index (in reference to
260            // component)
261            component_state.receive_canonical(index);
262
263            // process any waiting removes
264            if let Some((remove_index, _)) = component_state.waiting_removes.inner.pop_front() {
265                self.receive_remove_component_action(remove_index, component, outgoing_actions);
266            }
267        } else {
268            // buffer insert
269            component_state.waiting_inserts.push_back(index, ());
270        }
271    }
272
273    pub fn receive_remove_component_action(
274        &mut self,
275        index: ActionIndex,
276        component: ComponentKind,
277        outgoing_actions: &mut Vec<EntityAction<E>>,
278    ) {
279        // do not process any remove OLDER than last received spawn index / despawn index
280        if let Some(last_index) = self.last_canonical_index {
281            if sequence_less_than(index, last_index) {
282                return;
283            }
284        }
285
286        if let std::collections::hash_map::Entry::Vacant(e) = self.components.entry(component) {
287            e.insert(ComponentChannel::new(self.last_canonical_index));
288        }
289        let component_state = self.components.get_mut(&component).unwrap();
290
291        // do not process any remove OLDER than last received insert / remove index for
292        // this component
293        if let Some(last_index) = component_state.last_canonical_index {
294            if sequence_less_than(index, last_index) {
295                return;
296            }
297        }
298
299        if component_state.inserted {
300            component_state.inserted = false;
301            outgoing_actions.push(EntityAction::RemoveComponent(self.entity, component));
302
303            // pop ALL waiting inserts, and removes OLDER than remove_index (in reference to
304            // component)
305            component_state.receive_canonical(index);
306
307            // process any waiting inserts
308            if let Some((insert_index, _)) = component_state.waiting_inserts.inner.pop_front() {
309                self.receive_insert_component_action(insert_index, component, outgoing_actions);
310            }
311        } else {
312            // buffer remove
313            component_state.waiting_removes.push_back(index, ());
314        }
315    }
316
317    pub fn receive_canonical(&mut self, index: ActionIndex) {
318        // pop ALL waiting spawns, despawns, inserts, and removes OLDER than index
319        self.waiting_spawns.pop_front_until_and_including(index);
320        self.waiting_despawns.pop_front_until_and_including(index);
321        for component_state in self.components.values_mut() {
322            component_state.receive_canonical(index);
323        }
324
325        self.last_canonical_index = Some(index);
326    }
327}
328
329// Component Channel
330// most of this should be public, no methods here
331
332pub struct ComponentChannel<E: Copy + Hash + Eq> {
333    pub inserted: bool,
334    pub last_canonical_index: Option<ActionIndex>,
335    pub waiting_inserts: OrderedIds<()>,
336    pub waiting_removes: OrderedIds<()>,
337
338    phantom_e: PhantomData<E>,
339}
340
341impl<E: Copy + Hash + Eq> ComponentChannel<E> {
342    pub fn new(canonical_index: Option<ActionIndex>) -> Self {
343        Self {
344            inserted: false,
345            waiting_inserts: OrderedIds::new(),
346            waiting_removes: OrderedIds::new(),
347            last_canonical_index: canonical_index,
348
349            phantom_e: PhantomData,
350        }
351    }
352
353    pub fn receive_canonical(&mut self, index: ActionIndex) {
354        // pop ALL waiting inserts, and removes OLDER than index
355        self.waiting_inserts.pop_front_until_and_including(index);
356        self.waiting_removes.pop_front_until_and_including(index);
357
358        self.last_canonical_index = Some(index);
359    }
360}
361
362pub struct OrderedIds<P> {
363    // front small, back big
364    inner: VecDeque<(ActionIndex, P)>,
365}
366
367impl<P> OrderedIds<P> {
368    pub fn new() -> Self {
369        Self {
370            inner: VecDeque::new(),
371        }
372    }
373
374    // pub fn push_front(&mut self, index: ActionIndex) {
375    //     let mut index = 0;
376    //
377    //     loop {
378    //         if index == self.inner.len() {
379    //             self.inner.push_back(index);
380    //             return;
381    //         }
382    //
383    //         let old_index = self.inner.get(index).unwrap();
384    //         if sequence_greater_than(*old_index, index) {
385    //             self.inner.insert(index, index);
386    //             return;
387    //         }
388    //
389    //         index += 1
390    //     }
391    // }
392
393    pub fn push_back(&mut self, action_index: ActionIndex, item: P) {
394        let mut current_index = self.inner.len();
395
396        loop {
397            if current_index == 0 {
398                self.inner.push_front((action_index, item));
399                return;
400            }
401
402            current_index -= 1;
403
404            let (old_index, _) = self.inner.get(current_index).unwrap();
405            if sequence_less_than(*old_index, action_index) {
406                self.inner.insert(current_index + 1, (action_index, item));
407                return;
408            }
409        }
410    }
411
412    pub fn pop_front_until_and_including(&mut self, index: ActionIndex) {
413        let mut pop = false;
414
415        if let Some((old_index, _)) = self.inner.front() {
416            if *old_index == index || sequence_less_than(*old_index, index) {
417                pop = true;
418            }
419        }
420
421        if pop {
422            self.inner.pop_front();
423        }
424    }
425}