Skip to main content

naia_shared/connection/
base_connection.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    hash::Hash,
4    net::SocketAddr,
5};
6
7use naia_serde::{BitReader, BitWriter, Serde, SerdeErr};
8use naia_socket_shared::Instant;
9
10use crate::connection::bandwidth_accumulator::BandwidthAccumulator;
11use crate::world::local::local_world_manager::LocalWorldManager;
12use crate::world::world_reader::WorldReader;
13use crate::world::world_writer::WorldWriter;
14use crate::{
15    messages::{channels::channel_kinds::ChannelKinds, message_manager::MessageManager},
16    types::{HostType, PacketIndex},
17    world::{
18        entity::entity_converters::GlobalWorldManagerType, host::host_world_manager::CommandId,
19    },
20    AckManager, ComponentKind, ComponentKinds, ConnectionConfig, EntityAndGlobalEntityConverter,
21    EntityCommand, GlobalEntity, MessageKinds, PacketNotifiable, PacketType, StandardHeader, Tick,
22    Timer, WorldRefType,
23};
24
25/// Represents a connection to a remote host, and provides functionality to
26/// manage the connection and the communications to it
27pub struct BaseConnection {
28    /// Manages channel-routed message send/receive queues for this connection.
29    pub message_manager: MessageManager,
30    /// Manages entity-level replication state for this connection.
31    pub world_manager: LocalWorldManager,
32    ack_manager: AckManager,
33    heartbeat_timer: Timer,
34    bandwidth_accumulator: BandwidthAccumulator,
35}
36
37impl BaseConnection {
38    /// Create a new BaseConnection, given the appropriate underlying managers
39    pub fn new(
40        connection_config: &ConnectionConfig,
41        address: &Option<SocketAddr>,
42        host_type: HostType,
43        user_key: u64,
44        channel_kinds: &ChannelKinds,
45        global_world_manager: &dyn GlobalWorldManagerType,
46    ) -> Self {
47        Self {
48            message_manager: MessageManager::new(host_type, channel_kinds),
49            world_manager: LocalWorldManager::new(
50                address,
51                host_type,
52                user_key,
53                global_world_manager,
54            ),
55            ack_manager: AckManager::new(),
56            heartbeat_timer: Timer::new(connection_config.heartbeat_interval),
57            bandwidth_accumulator: BandwidthAccumulator::new(&connection_config.bandwidth),
58        }
59    }
60
61    // Bandwidth accumulator (outbound token-bucket cap)
62
63    /// Tick the bandwidth accumulator, adding `target_bytes_per_sec × dt` to
64    /// the budget and refreshing the one-packet-overshoot allowance.
65    pub fn accumulate_bandwidth(&mut self, now: &Instant) {
66        self.bandwidth_accumulator.accumulate(now);
67    }
68
69    /// Check whether a packet of `estimated_bytes` is permitted under the
70    /// current budget. Allows one MTU-sized overshoot per tick when the
71    /// budget is positive but short.
72    pub fn can_spend_bandwidth(&self, estimated_bytes: u32) -> bool {
73        self.bandwidth_accumulator.can_spend(estimated_bytes)
74    }
75
76    /// Subtract `actual_bytes` from the bandwidth budget after a send.
77    pub fn spend_bandwidth(&mut self, actual_bytes: u32) {
78        self.bandwidth_accumulator.spend(actual_bytes);
79    }
80
81    /// Current remaining budget (may be negative after overshoot).
82    pub fn bandwidth_remaining(&self) -> f64 {
83        self.bandwidth_accumulator.remaining()
84    }
85
86    /// Bytes sent during the most-recently-completed send cycle (D13 telemetry).
87    pub fn bandwidth_bytes_sent_last_tick(&self) -> u64 {
88        self.bandwidth_accumulator.bytes_sent_last_tick()
89    }
90
91    /// Packets deferred by the budget gate during the most-recently-completed
92    /// send cycle. Always 0 unless `bench_instrumentation` is enabled.
93    pub fn bandwidth_packets_deferred_last_tick(&self) -> u32 {
94        self.bandwidth_accumulator.packets_deferred_last_tick()
95    }
96
97    /// Record that a packet was deferred by the budget gate this cycle.
98    /// Invoked from send loops when `can_spend_bandwidth` returns false.
99    pub fn record_bandwidth_deferred(&mut self) {
100        self.bandwidth_accumulator.record_deferred();
101    }
102
103    // Heartbeats
104
105    /// Record that a message has been sent (to prevent needing to send a
106    /// heartbeat)
107    pub fn mark_sent(&mut self) {
108        self.heartbeat_timer.reset();
109        self.ack_manager.clear_should_send_empty_ack();
110    }
111
112    /// Returns whether a heartbeat message should be sent
113    pub fn should_send_heartbeat(&self) -> bool {
114        self.heartbeat_timer.ringing()
115    }
116
117    // Acks & Headers
118
119    /// Sets the flag requesting that an empty ack packet be sent.
120    pub fn mark_should_send_empty_ack(&mut self) {
121        self.ack_manager.mark_should_send_empty_ack();
122    }
123
124    /// Returns `true` if an empty ack should be sent this tick.
125    pub fn should_send_empty_ack(&self) -> bool {
126        self.ack_manager.should_send_empty_ack()
127    }
128
129    /// Returns the empty-ack flag and clears it atomically.
130    pub fn take_should_send_empty_ack(&mut self) -> bool {
131        self.ack_manager.take_should_send_empty_ack()
132    }
133
134    /// Process an incoming packet, pulling out the packet index number to keep
135    /// track of the current RTT, and sending the packet to the AckManager to
136    /// handle packet notification events
137    pub fn process_incoming_header(
138        &mut self,
139        header: &StandardHeader,
140        packet_notifiables: &mut [&mut dyn PacketNotifiable],
141    ) {
142        let mut base_packet_notifiables: [&mut dyn PacketNotifiable; 2] =
143            [&mut self.message_manager, &mut self.world_manager];
144        self.ack_manager.process_incoming_header(
145            header,
146            &mut base_packet_notifiables,
147            packet_notifiables,
148        );
149    }
150
151    /// Given a packet payload, start tracking the packet via it's index, attach
152    /// the appropriate header, and return the packet's resulting underlying
153    /// bytes
154    pub fn write_header(
155        &mut self,
156        packet_type: PacketType,
157        writer: &mut BitWriter,
158    ) -> StandardHeader {
159        let header = self.ack_manager.next_outgoing_packet_header(packet_type);
160        header.ser(writer);
161        header
162    }
163
164    /// Get the next outgoing packet's index
165    pub fn next_packet_index(&self) -> PacketIndex {
166        self.ack_manager.next_sender_packet_index()
167    }
168
169    /// Returns the sequence index of the last received packet from the remote.
170    pub fn last_received_packet_index(&self) -> PacketIndex {
171        self.ack_manager.last_received_packet_index()
172    }
173
174    /// Fraction of sent data-packets that were lost in the last 64-packet window.
175    pub fn packet_loss_pct(&self) -> f32 {
176        self.ack_manager.packet_loss_pct()
177    }
178
179    /// Drains pending world-manager and message-manager outbound queues into writeable packets.
180    pub fn collect_messages(&mut self, now: &Instant, rtt_millis: &f32) {
181        self.world_manager.collect_messages(now, rtt_millis);
182        self.message_manager
183            .collect_outgoing_messages(now, rtt_millis);
184    }
185
186    fn write_messages(
187        &mut self,
188        channel_kinds: &ChannelKinds,
189        message_kinds: &MessageKinds,
190        global_world_manager: &dyn GlobalWorldManagerType,
191        writer: &mut BitWriter,
192        packet_index: PacketIndex,
193        has_written: &mut bool,
194    ) {
195        let mut converter = self
196            .world_manager
197            .entity_converter_mut(global_world_manager);
198        self.message_manager.write_messages(
199            channel_kinds,
200            message_kinds,
201            &mut converter,
202            writer,
203            packet_index,
204            has_written,
205        );
206    }
207
208    /// Serializes messages and world events into `writer` for the outgoing packet at `packet_index`.
209    #[allow(clippy::too_many_arguments)]
210    pub fn write_packet<E: Copy + Eq + Hash + Sync + Send, W: WorldRefType<E>>(
211        &mut self,
212        channel_kinds: &ChannelKinds,
213        message_kinds: &MessageKinds,
214        component_kinds: &ComponentKinds,
215        now: &Instant,
216        writer: &mut BitWriter,
217        packet_index: PacketIndex,
218        world: &W,
219        entity_converter: &dyn EntityAndGlobalEntityConverter<E>,
220        global_world_manager: &dyn GlobalWorldManagerType,
221        has_written: &mut bool,
222        write_world_events: bool,
223        host_world_events: &mut VecDeque<(CommandId, EntityCommand)>,
224        update_events: &mut HashMap<GlobalEntity, HashSet<ComponentKind>>,
225        entity_priority_order: Option<&[GlobalEntity]>,
226    ) {
227        // write messages
228        self.write_messages(
229            channel_kinds,
230            message_kinds,
231            global_world_manager,
232            writer,
233            packet_index,
234            has_written,
235        );
236
237        // write world events
238        if write_world_events {
239            WorldWriter::write_into_packet(
240                component_kinds,
241                now,
242                writer,
243                &packet_index,
244                world,
245                entity_converter,
246                global_world_manager,
247                &mut self.world_manager,
248                has_written,
249                host_world_events,
250                update_events,
251                entity_priority_order,
252            );
253        }
254    }
255
256    /// Deserializes an incoming packet, routing messages and world events to their managers.
257    pub fn read_packet(
258        &mut self,
259        channel_kinds: &ChannelKinds,
260        message_kinds: &MessageKinds,
261        component_kinds: &ComponentKinds,
262        tick: &Tick,
263        read_world_events: bool,
264        reader: &mut BitReader,
265    ) -> Result<(), SerdeErr> {
266        // read messages
267        self.message_manager.read_messages(
268            channel_kinds,
269            message_kinds,
270            &mut self.world_manager,
271            reader,
272        )?;
273
274        // read world events
275        if read_world_events {
276            WorldReader::read_world_events(&mut self.world_manager, component_kinds, tick, reader)?;
277        }
278
279        Ok(())
280    }
281}