naia_shared/world/host/
host_world_manager.rs

1use std::{
2    clone::Clone,
3    collections::{HashMap, HashSet, VecDeque},
4    hash::Hash,
5    net::SocketAddr,
6    time::Duration,
7};
8
9use crate::{
10    sequence_list::SequenceList,
11    world::{
12        entity::entity_converters::GlobalWorldManagerType, local_world_manager::LocalWorldManager,
13    },
14    ComponentKind, DiffMask, EntityAction, HostEntity, Instant, MessageIndex, PacketIndex,
15    WorldRefType,
16};
17
18use super::{entity_action_event::EntityActionEvent, world_channel::WorldChannel};
19
20const DROP_UPDATE_RTT_FACTOR: f32 = 1.5;
21const ACTION_RECORD_TTL: Duration = Duration::from_secs(60);
22
23pub type ActionId = MessageIndex;
24
25/// Manages Entities for a given Client connection and keeps them in
26/// sync on the Client
27pub struct HostWorldManager<E: Copy + Eq + Hash + Send + Sync> {
28    // World
29    pub world_channel: WorldChannel<E>,
30
31    // Actions
32    pub sent_action_packets: SequenceList<(Instant, Vec<(ActionId, EntityAction<E>)>)>,
33
34    // Updates
35    /// Map of component updates and [`DiffMask`] that were written into each packet
36    pub sent_updates: HashMap<PacketIndex, (Instant, HashMap<(E, ComponentKind), DiffMask>)>,
37    /// Last [`PacketIndex`] where a component update was written by the server
38    pub last_update_packet_index: PacketIndex,
39}
40
41pub struct HostWorldEvents<E: Copy + Eq + Hash + Send + Sync> {
42    pub next_send_actions: VecDeque<(ActionId, EntityActionEvent<E>)>,
43    pub next_send_updates: HashMap<E, HashSet<ComponentKind>>,
44}
45
46impl<E: Copy + Eq + Hash + Send + Sync> HostWorldEvents<E> {
47    pub fn has_events(&self) -> bool {
48        !self.next_send_actions.is_empty() || !self.next_send_updates.is_empty()
49    }
50}
51
52impl<E: Copy + Eq + Hash + Send + Sync> HostWorldManager<E> {
53    /// Create a new HostWorldManager, given the client's address
54    pub fn new(
55        address: &Option<SocketAddr>,
56        global_world_manager: &dyn GlobalWorldManagerType<E>,
57    ) -> Self {
58        HostWorldManager {
59            // World
60            world_channel: WorldChannel::new(address, global_world_manager),
61            sent_action_packets: SequenceList::new(),
62
63            // Update
64            sent_updates: HashMap::new(),
65            last_update_packet_index: 0,
66        }
67    }
68
69    // World
70
71    // used when Entity first comes into Connection's scope
72    pub fn init_entity(
73        &mut self,
74        world_manager: &mut LocalWorldManager<E>,
75        entity: &E,
76        component_kinds: Vec<ComponentKind>,
77    ) {
78        // add entity
79        self.spawn_entity(world_manager, entity, &component_kinds);
80        // add components
81        for component_kind in component_kinds {
82            self.insert_component(entity, &component_kind);
83        }
84    }
85
86    pub fn spawn_entity(
87        &mut self,
88        world_manager: &mut LocalWorldManager<E>,
89        entity: &E,
90        component_kinds: &Vec<ComponentKind>,
91    ) {
92        self.world_channel
93            .host_spawn_entity(world_manager, entity, component_kinds);
94    }
95
96    pub fn despawn_entity(&mut self, entity: &E) {
97        self.world_channel.host_despawn_entity(entity);
98    }
99
100    pub fn client_initiated_despawn(&mut self, entity: &E) {
101        self.world_channel.client_initiated_despawn(entity);
102    }
103
104    pub fn insert_component(&mut self, entity: &E, component_kind: &ComponentKind) {
105        self.world_channel
106            .host_insert_component(entity, component_kind);
107    }
108
109    pub fn remove_component(&mut self, entity: &E, component_kind: &ComponentKind) {
110        self.world_channel
111            .host_remove_component(entity, component_kind);
112    }
113
114    pub fn host_has_entity(&self, entity: &E) -> bool {
115        self.world_channel.host_has_entity(entity)
116    }
117
118    // used when Remote Entity gains Write Authority (delegation)
119    pub fn track_remote_entity(
120        &mut self,
121        local_world_manager: &mut LocalWorldManager<E>,
122        entity: &E,
123        component_kinds: Vec<ComponentKind>,
124    ) -> HostEntity {
125        // add entity
126        let new_host_entity =
127            self.world_channel
128                .track_remote_entity(local_world_manager, entity, &component_kinds);
129
130        // info!("--- tracking remote entity ---");
131
132        // add components
133        for component_kind in component_kinds {
134            self.track_remote_component(entity, &component_kind);
135        }
136
137        // info!("--- ---------------------- ---");
138
139        new_host_entity
140    }
141
142    pub fn untrack_remote_entity(
143        &mut self,
144        local_world_manager: &mut LocalWorldManager<E>,
145        entity: &E,
146    ) {
147        self.world_channel
148            .untrack_remote_entity(local_world_manager, entity);
149    }
150
151    pub fn track_remote_component(&mut self, entity: &E, component_kind: &ComponentKind) {
152        self.world_channel
153            .track_remote_component(entity, component_kind);
154    }
155
156    // Messages
157
158    pub fn handle_dropped_packets(&mut self, now: &Instant, rtt_millis: &f32) {
159        self.handle_dropped_update_packets(now, rtt_millis);
160        self.handle_dropped_action_packets(now);
161    }
162
163    // Collecting
164
165    fn handle_dropped_action_packets(&mut self, now: &Instant) {
166        let mut pop = false;
167
168        loop {
169            if let Some((_, (time_sent, _))) = self.sent_action_packets.front() {
170                if time_sent.elapsed(now) > ACTION_RECORD_TTL {
171                    pop = true;
172                }
173            } else {
174                return;
175            }
176            if pop {
177                self.sent_action_packets.pop_front();
178            } else {
179                return;
180            }
181        }
182    }
183
184    fn handle_dropped_update_packets(&mut self, now: &Instant, rtt_millis: &f32) {
185        let drop_duration = Duration::from_millis((DROP_UPDATE_RTT_FACTOR * rtt_millis) as u64);
186
187        {
188            let mut dropped_packets = Vec::new();
189            for (packet_index, (time_sent, _)) in &self.sent_updates {
190                let elapsed_since_send = time_sent.elapsed(now);
191                if elapsed_since_send > drop_duration {
192                    dropped_packets.push(*packet_index);
193                }
194            }
195
196            for packet_index in dropped_packets {
197                self.dropped_update_cleanup(packet_index);
198            }
199        }
200    }
201
202    fn dropped_update_cleanup(&mut self, dropped_packet_index: PacketIndex) {
203        if let Some((_, diff_mask_map)) = self.sent_updates.remove(&dropped_packet_index) {
204            for (component_index, diff_mask) in &diff_mask_map {
205                let (entity, component) = component_index;
206                if !self
207                    .world_channel
208                    .diff_handler
209                    .has_component(entity, component)
210                {
211                    continue;
212                }
213                let mut new_diff_mask = diff_mask.clone();
214
215                // walk from dropped packet up to most recently sent packet
216                if dropped_packet_index != self.last_update_packet_index {
217                    let mut packet_index = dropped_packet_index.wrapping_add(1);
218                    while packet_index != self.last_update_packet_index {
219                        if let Some((_, diff_mask_map)) = self.sent_updates.get(&packet_index) {
220                            if let Some(next_diff_mask) = diff_mask_map.get(component_index) {
221                                new_diff_mask.nand(next_diff_mask);
222                            }
223                        }
224
225                        packet_index = packet_index.wrapping_add(1);
226                    }
227                }
228
229                self.world_channel
230                    .diff_handler
231                    .or_diff_mask(entity, component, &new_diff_mask);
232            }
233        }
234    }
235
236    pub fn take_outgoing_events<W: WorldRefType<E>>(
237        &mut self,
238        world: &W,
239        global_world_manager: &dyn GlobalWorldManagerType<E>,
240        now: &Instant,
241        rtt_millis: &f32,
242    ) -> HostWorldEvents<E> {
243        HostWorldEvents {
244            next_send_actions: self.world_channel.take_next_actions(now, rtt_millis),
245            next_send_updates: self
246                .world_channel
247                .collect_next_updates(world, global_world_manager),
248        }
249    }
250}
251
252impl<E: Copy + Eq + Hash + Send + Sync> HostWorldManager<E> {
253    pub fn notify_packet_delivered(
254        &mut self,
255        packet_index: PacketIndex,
256        local_world_manager: &mut LocalWorldManager<E>,
257    ) {
258        // Updates
259        self.sent_updates.remove(&packet_index);
260
261        // Actions
262        if let Some((_, action_list)) = self
263            .sent_action_packets
264            .remove_scan_from_front(&packet_index)
265        {
266            for (action_id, action) in action_list {
267                self.world_channel
268                    .action_delivered(local_world_manager, action_id, action);
269            }
270        }
271    }
272}