naia_shared/connection/
base_connection.rs

1use std::{hash::Hash, net::SocketAddr};
2
3use naia_serde::{BitReader, BitWriter, Serde, SerdeErr};
4use naia_socket_shared::Instant;
5
6use crate::{
7    backends::Timer,
8    messages::{channels::channel_kinds::ChannelKinds, message_manager::MessageManager},
9    types::{HostType, PacketIndex},
10    world::{
11        entity::entity_converters::{EntityConverterMut, GlobalWorldManagerType},
12        host::{host_world_manager::HostWorldEvents, host_world_writer::HostWorldWriter},
13        local_world_manager::LocalWorldManager,
14        remote::remote_world_reader::RemoteWorldReader,
15    },
16    HostWorldManager, Protocol, RemoteWorldManager, Tick, WorldRefType,
17};
18
19use super::{
20    ack_manager::AckManager, connection_config::ConnectionConfig,
21    packet_notifiable::PacketNotifiable, packet_type::PacketType, standard_header::StandardHeader,
22};
23
24/// Represents a connection to a remote host, and provides functionality to
25/// manage the connection and the communications to it
26pub struct BaseConnection<E: Copy + Eq + Hash + Send + Sync> {
27    pub message_manager: MessageManager,
28    pub host_world_manager: HostWorldManager<E>,
29    pub remote_world_manager: RemoteWorldManager<E>,
30    pub remote_world_reader: RemoteWorldReader<E>,
31    pub local_world_manager: LocalWorldManager<E>,
32    heartbeat_timer: Timer,
33    timeout_timer: Timer,
34    ack_manager: AckManager,
35}
36
37impl<E: Copy + Eq + Hash + Send + Sync> BaseConnection<E> {
38    /// Create a new BaseConnection, given the appropriate underlying managers
39    pub fn new(
40        address: &Option<SocketAddr>,
41        host_type: HostType,
42        user_key: u64,
43        connection_config: &ConnectionConfig,
44        channel_kinds: &ChannelKinds,
45        global_world_manager: &dyn GlobalWorldManagerType<E>,
46    ) -> Self {
47        Self {
48            heartbeat_timer: Timer::new(connection_config.heartbeat_interval),
49            timeout_timer: Timer::new(connection_config.disconnection_timeout_duration),
50            ack_manager: AckManager::new(),
51            message_manager: MessageManager::new(host_type, channel_kinds),
52            host_world_manager: HostWorldManager::new(address, global_world_manager),
53            remote_world_manager: RemoteWorldManager::new(),
54            remote_world_reader: RemoteWorldReader::new(),
55            local_world_manager: LocalWorldManager::new(user_key),
56        }
57    }
58
59    // Heartbeats
60
61    /// Record that a message has been sent (to prevent needing to send a
62    /// heartbeat)
63    pub fn mark_sent(&mut self) {
64        self.heartbeat_timer.reset();
65        self.ack_manager.clear_should_send_empty_ack();
66    }
67
68    /// Returns whether a heartbeat message should be sent
69    pub fn should_send_heartbeat(&self) -> bool {
70        self.heartbeat_timer.ringing()
71    }
72
73    // Timeouts
74
75    /// Record that a message has been received from a remote host (to prevent
76    /// disconnecting from the remote host)
77    pub fn mark_heard(&mut self) {
78        self.timeout_timer.reset()
79    }
80
81    /// Returns whether this connection should be dropped as a result of a
82    /// timeout
83    pub fn should_drop(&self) -> bool {
84        self.timeout_timer.ringing()
85    }
86
87    // Acks & Headers
88
89    pub fn mark_should_send_empty_ack(&mut self) {
90        self.ack_manager.mark_should_send_empty_ack();
91    }
92
93    pub fn should_send_empty_ack(&self) -> bool {
94        self.ack_manager.should_send_empty_ack()
95    }
96
97    /// Process an incoming packet, pulling out the packet index number to keep
98    /// track of the current RTT, and sending the packet to the AckManager to
99    /// handle packet notification events
100    pub fn process_incoming_header(
101        &mut self,
102        header: &StandardHeader,
103        packet_notifiables: &mut [&mut dyn PacketNotifiable],
104    ) {
105        self.ack_manager.process_incoming_header(
106            header,
107            &mut self.message_manager,
108            &mut self.host_world_manager,
109            &mut self.local_world_manager,
110            packet_notifiables,
111        );
112    }
113
114    /// Given a packet payload, start tracking the packet via it's index, attach
115    /// the appropriate header, and return the packet's resulting underlying
116    /// bytes
117    pub fn write_header(&mut self, packet_type: PacketType, writer: &mut BitWriter) {
118        // Add header onto message!
119        self.ack_manager
120            .next_outgoing_packet_header(packet_type)
121            .ser(writer);
122    }
123
124    /// Get the next outgoing packet's index
125    pub fn next_packet_index(&self) -> PacketIndex {
126        self.ack_manager.next_sender_packet_index()
127    }
128
129    pub fn collect_messages(&mut self, now: &Instant, rtt_millis: &f32) {
130        self.host_world_manager
131            .handle_dropped_packets(now, rtt_millis);
132        self.message_manager
133            .collect_outgoing_messages(now, rtt_millis);
134    }
135
136    fn write_messages(
137        &mut self,
138        protocol: &Protocol,
139        global_world_manager: &dyn GlobalWorldManagerType<E>,
140        writer: &mut BitWriter,
141        packet_index: PacketIndex,
142        has_written: &mut bool,
143    ) {
144        let mut converter =
145            EntityConverterMut::new(global_world_manager, &mut self.local_world_manager);
146        self.message_manager.write_messages(
147            protocol,
148            &mut converter,
149            writer,
150            packet_index,
151            has_written,
152        );
153    }
154
155    pub fn write_packet<W: WorldRefType<E>>(
156        &mut self,
157        protocol: &Protocol,
158        now: &Instant,
159        writer: &mut BitWriter,
160        packet_index: PacketIndex,
161        world: &W,
162        global_world_manager: &dyn GlobalWorldManagerType<E>,
163        has_written: &mut bool,
164        write_world_events: bool,
165        host_world_events: &mut HostWorldEvents<E>,
166    ) {
167        // write messages
168        self.write_messages(
169            &protocol,
170            global_world_manager,
171            writer,
172            packet_index,
173            has_written,
174        );
175
176        // write world events
177        if write_world_events {
178            HostWorldWriter::write_into_packet(
179                &protocol.component_kinds,
180                now,
181                writer,
182                &packet_index,
183                world,
184                global_world_manager,
185                &mut self.local_world_manager,
186                has_written,
187                &mut self.host_world_manager,
188                host_world_events,
189            );
190        }
191    }
192
193    pub fn read_packet(
194        &mut self,
195        protocol: &Protocol,
196        client_tick: &Tick,
197        global_world_manager: &dyn GlobalWorldManagerType<E>,
198        read_world_events: bool,
199        reader: &mut BitReader,
200    ) -> Result<(), SerdeErr> {
201        // read messages
202        self.message_manager.read_messages(
203            protocol,
204            &mut self.remote_world_manager.entity_waitlist,
205            global_world_manager.to_global_entity_converter(),
206            &self.local_world_manager,
207            reader,
208        )?;
209
210        // read world events
211        if read_world_events {
212            self.remote_world_reader.read_world_events(
213                global_world_manager,
214                &mut self.local_world_manager,
215                protocol,
216                client_tick,
217                reader,
218            )?;
219        }
220
221        Ok(())
222    }
223
224    pub fn remote_entities(&self) -> Vec<E> {
225        self.local_world_manager.remote_entities()
226    }
227}