1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 hash::Hash,
4 net::SocketAddr,
5};
6
7use naia_serde::{BitReader, BitWriter, Serde, SerdeErr};
8use naia_socket_shared::Instant;
9
10use crate::connection::bandwidth_accumulator::BandwidthAccumulator;
11use crate::world::local::local_world_manager::LocalWorldManager;
12use crate::world::world_reader::WorldReader;
13use crate::world::world_writer::WorldWriter;
14use crate::{
15 messages::{channels::channel_kinds::ChannelKinds, message_manager::MessageManager},
16 types::{HostType, PacketIndex},
17 world::{
18 entity::entity_converters::GlobalWorldManagerType, host::host_world_manager::CommandId,
19 },
20 AckManager, ComponentKind, ComponentKinds, ConnectionConfig, EntityAndGlobalEntityConverter,
21 EntityCommand, GlobalEntity, MessageKinds, PacketNotifiable, PacketType, StandardHeader, Tick,
22 Timer, WorldRefType,
23};
24
25pub struct BaseConnection {
28 pub message_manager: MessageManager,
30 pub world_manager: LocalWorldManager,
32 ack_manager: AckManager,
33 heartbeat_timer: Timer,
34 bandwidth_accumulator: BandwidthAccumulator,
35}
36
37impl BaseConnection {
38 pub fn new(
40 connection_config: &ConnectionConfig,
41 address: &Option<SocketAddr>,
42 host_type: HostType,
43 user_key: u64,
44 channel_kinds: &ChannelKinds,
45 global_world_manager: &dyn GlobalWorldManagerType,
46 ) -> Self {
47 Self {
48 message_manager: MessageManager::new(host_type, channel_kinds),
49 world_manager: LocalWorldManager::new(
50 address,
51 host_type,
52 user_key,
53 global_world_manager,
54 ),
55 ack_manager: AckManager::new(),
56 heartbeat_timer: Timer::new(connection_config.heartbeat_interval),
57 bandwidth_accumulator: BandwidthAccumulator::new(&connection_config.bandwidth),
58 }
59 }
60
61 pub fn accumulate_bandwidth(&mut self, now: &Instant) {
66 self.bandwidth_accumulator.accumulate(now);
67 }
68
69 pub fn can_spend_bandwidth(&self, estimated_bytes: u32) -> bool {
73 self.bandwidth_accumulator.can_spend(estimated_bytes)
74 }
75
76 pub fn spend_bandwidth(&mut self, actual_bytes: u32) {
78 self.bandwidth_accumulator.spend(actual_bytes);
79 }
80
81 pub fn bandwidth_remaining(&self) -> f64 {
83 self.bandwidth_accumulator.remaining()
84 }
85
86 pub fn bandwidth_bytes_sent_last_tick(&self) -> u64 {
88 self.bandwidth_accumulator.bytes_sent_last_tick()
89 }
90
91 pub fn bandwidth_packets_deferred_last_tick(&self) -> u32 {
94 self.bandwidth_accumulator.packets_deferred_last_tick()
95 }
96
97 pub fn record_bandwidth_deferred(&mut self) {
100 self.bandwidth_accumulator.record_deferred();
101 }
102
103 pub fn mark_sent(&mut self) {
108 self.heartbeat_timer.reset();
109 self.ack_manager.clear_should_send_empty_ack();
110 }
111
112 pub fn should_send_heartbeat(&self) -> bool {
114 self.heartbeat_timer.ringing()
115 }
116
117 pub fn mark_should_send_empty_ack(&mut self) {
121 self.ack_manager.mark_should_send_empty_ack();
122 }
123
124 pub fn should_send_empty_ack(&self) -> bool {
126 self.ack_manager.should_send_empty_ack()
127 }
128
129 pub fn take_should_send_empty_ack(&mut self) -> bool {
131 self.ack_manager.take_should_send_empty_ack()
132 }
133
134 pub fn process_incoming_header(
138 &mut self,
139 header: &StandardHeader,
140 packet_notifiables: &mut [&mut dyn PacketNotifiable],
141 ) {
142 let mut base_packet_notifiables: [&mut dyn PacketNotifiable; 2] =
143 [&mut self.message_manager, &mut self.world_manager];
144 self.ack_manager.process_incoming_header(
145 header,
146 &mut base_packet_notifiables,
147 packet_notifiables,
148 );
149 }
150
151 pub fn write_header(
155 &mut self,
156 packet_type: PacketType,
157 writer: &mut BitWriter,
158 ) -> StandardHeader {
159 let header = self.ack_manager.next_outgoing_packet_header(packet_type);
160 header.ser(writer);
161 header
162 }
163
164 pub fn next_packet_index(&self) -> PacketIndex {
166 self.ack_manager.next_sender_packet_index()
167 }
168
169 pub fn last_received_packet_index(&self) -> PacketIndex {
171 self.ack_manager.last_received_packet_index()
172 }
173
174 pub fn packet_loss_pct(&self) -> f32 {
176 self.ack_manager.packet_loss_pct()
177 }
178
179 pub fn collect_messages(&mut self, now: &Instant, rtt_millis: &f32) {
181 self.world_manager.collect_messages(now, rtt_millis);
182 self.message_manager
183 .collect_outgoing_messages(now, rtt_millis);
184 }
185
186 fn write_messages(
187 &mut self,
188 channel_kinds: &ChannelKinds,
189 message_kinds: &MessageKinds,
190 global_world_manager: &dyn GlobalWorldManagerType,
191 writer: &mut BitWriter,
192 packet_index: PacketIndex,
193 has_written: &mut bool,
194 ) {
195 let mut converter = self
196 .world_manager
197 .entity_converter_mut(global_world_manager);
198 self.message_manager.write_messages(
199 channel_kinds,
200 message_kinds,
201 &mut converter,
202 writer,
203 packet_index,
204 has_written,
205 );
206 }
207
208 #[allow(clippy::too_many_arguments)]
210 pub fn write_packet<E: Copy + Eq + Hash + Sync + Send, W: WorldRefType<E>>(
211 &mut self,
212 channel_kinds: &ChannelKinds,
213 message_kinds: &MessageKinds,
214 component_kinds: &ComponentKinds,
215 now: &Instant,
216 writer: &mut BitWriter,
217 packet_index: PacketIndex,
218 world: &W,
219 entity_converter: &dyn EntityAndGlobalEntityConverter<E>,
220 global_world_manager: &dyn GlobalWorldManagerType,
221 has_written: &mut bool,
222 write_world_events: bool,
223 host_world_events: &mut VecDeque<(CommandId, EntityCommand)>,
224 update_events: &mut HashMap<GlobalEntity, HashSet<ComponentKind>>,
225 entity_priority_order: Option<&[GlobalEntity]>,
226 ) {
227 self.write_messages(
229 channel_kinds,
230 message_kinds,
231 global_world_manager,
232 writer,
233 packet_index,
234 has_written,
235 );
236
237 if write_world_events {
239 WorldWriter::write_into_packet(
240 component_kinds,
241 now,
242 writer,
243 &packet_index,
244 world,
245 entity_converter,
246 global_world_manager,
247 &mut self.world_manager,
248 has_written,
249 host_world_events,
250 update_events,
251 entity_priority_order,
252 );
253 }
254 }
255
256 pub fn read_packet(
258 &mut self,
259 channel_kinds: &ChannelKinds,
260 message_kinds: &MessageKinds,
261 component_kinds: &ComponentKinds,
262 tick: &Tick,
263 read_world_events: bool,
264 reader: &mut BitReader,
265 ) -> Result<(), SerdeErr> {
266 self.message_manager.read_messages(
268 channel_kinds,
269 message_kinds,
270 &mut self.world_manager,
271 reader,
272 )?;
273
274 if read_world_events {
276 WorldReader::read_world_events(&mut self.world_manager, component_kinds, tick, reader)?;
277 }
278
279 Ok(())
280 }
281}