use std::{any::Any, hash::Hash, net::SocketAddr};
use log::warn;
use naia_shared::{
BaseConnection, BigMapKey, BitReader, BitWriter, ChannelKind, ChannelKinds, ConnectionConfig,
EntityEventMessage, EntityResponseEvent, HostType, HostWorldEvents, Instant, PacketType,
Protocol, Serde, SerdeErr, StandardHeader, SystemChannel, Tick, WorldMutType, WorldRefType,
};
use crate::request::{GlobalRequestManager, GlobalResponseManager};
use crate::{
connection::{
io::Io, ping_config::PingConfig, tick_buffer_messages::TickBufferMessages,
tick_buffer_receiver::TickBufferReceiver,
},
events::Events,
time_manager::TimeManager,
user::UserKey,
world::global_world_manager::GlobalWorldManager,
};
use super::ping_manager::PingManager;
pub struct Connection<E: Copy + Eq + Hash + Send + Sync> {
pub address: SocketAddr,
pub user_key: UserKey,
pub base: BaseConnection<E>,
pub ping_manager: PingManager,
tick_buffer: TickBufferReceiver,
pub manual_disconnect: bool,
}
impl<E: Copy + Eq + Hash + Send + Sync> Connection<E> {
pub fn new(
connection_config: &ConnectionConfig,
ping_config: &PingConfig,
user_address: &SocketAddr,
user_key: &UserKey,
channel_kinds: &ChannelKinds,
global_world_manager: &GlobalWorldManager<E>,
) -> Self {
Self {
address: *user_address,
user_key: *user_key,
base: BaseConnection::new(
&Some(*user_address),
HostType::Server,
user_key.to_u64(),
connection_config,
channel_kinds,
global_world_manager,
),
ping_manager: PingManager::new(ping_config),
tick_buffer: TickBufferReceiver::new(channel_kinds),
manual_disconnect: false,
}
}
pub fn process_incoming_header(&mut self, header: &StandardHeader) {
self.base.process_incoming_header(header, &mut []);
}
pub fn read_packet(
&mut self,
protocol: &Protocol,
server_tick: Tick,
client_tick: Tick,
reader: &mut BitReader,
global_world_manager: &mut GlobalWorldManager<E>,
) -> Result<(), SerdeErr> {
self.tick_buffer.read_messages(
protocol,
&server_tick,
&client_tick,
global_world_manager,
&self.base.local_world_manager,
reader,
)?;
self.base.read_packet(
protocol,
&client_tick,
global_world_manager,
protocol.client_authoritative_entities,
reader,
)?;
return Ok(());
}
pub fn process_packets<W: WorldMutType<E>>(
&mut self,
protocol: &Protocol,
now: &Instant,
global_world_manager: &mut GlobalWorldManager<E>,
global_request_manager: &mut GlobalRequestManager,
global_response_manager: &mut GlobalResponseManager,
world: &mut W,
incoming_events: &mut Events<E>,
) -> Vec<EntityResponseEvent<E>> {
let mut response_events = Vec::new();
let messages = self.base.message_manager.receive_messages(
&protocol.message_kinds,
now,
global_world_manager,
&self.base.local_world_manager,
&mut self.base.remote_world_manager.entity_waitlist,
);
for (channel_kind, messages) in messages {
if channel_kind == ChannelKind::of::<SystemChannel>() {
for message in messages {
let Some(event_message) = Box::<dyn Any + 'static>::downcast::<
EntityEventMessage,
>(message.to_boxed_any())
.ok()
.map(|boxed_m| *boxed_m) else {
panic!("Received unknown message over SystemChannel!");
};
match event_message.entity.get(global_world_manager) {
Some(entity) => {
response_events.push(event_message.action.to_response_event(&entity));
}
None => {
warn!(
"Received `{:?}` with no Entity over SystemChannel!",
event_message.action
);
}
};
}
} else {
for message in messages {
incoming_events.push_message(&self.user_key, &channel_kind, message);
}
}
}
let (requests, responses) = self.base.message_manager.receive_requests_and_responses();
for (channel_kind, requests) in requests {
for (local_response_id, request) in requests {
let global_response_id = global_response_manager.create_response_id(
&self.user_key,
&channel_kind,
&local_response_id,
);
incoming_events.push_request(
&self.user_key,
&channel_kind,
global_response_id,
request,
);
}
}
for (global_request_id, response) in responses {
global_request_manager.receive_response(&global_request_id, response);
}
if protocol.client_authoritative_entities {
let remote_events = self.base.remote_world_reader.take_incoming_events();
let world_events = self.base.remote_world_manager.process_world_events(
global_world_manager,
&mut self.base.local_world_manager,
&protocol.component_kinds,
world,
now,
remote_events,
);
response_events
.extend(incoming_events.receive_entity_events(&self.user_key, world_events));
}
return response_events;
}
pub fn tick_buffer_messages(&mut self, tick: &Tick, messages: &mut TickBufferMessages) {
let channel_messages = self.tick_buffer.receive_messages(tick);
for (channel_kind, received_messages) in channel_messages {
for message in received_messages {
messages.push_message(&self.user_key, &channel_kind, message);
}
}
}
pub fn send_packets<W: WorldRefType<E>>(
&mut self,
protocol: &Protocol,
now: &Instant,
io: &mut Io,
world: &W,
global_world_manager: &GlobalWorldManager<E>,
time_manager: &TimeManager,
) {
let rtt_millis = self.ping_manager.rtt_average;
self.base.collect_messages(now, &rtt_millis);
let mut host_world_events = self.base.host_world_manager.take_outgoing_events(
world,
global_world_manager,
now,
&rtt_millis,
);
let mut any_sent = false;
loop {
if self.send_packet(
protocol,
now,
io,
world,
global_world_manager,
time_manager,
&mut host_world_events,
) {
any_sent = true;
} else {
break;
}
}
if any_sent {
self.base.mark_sent();
}
}
fn send_packet<W: WorldRefType<E>>(
&mut self,
protocol: &Protocol,
now: &Instant,
io: &mut Io,
world: &W,
global_world_manager: &GlobalWorldManager<E>,
time_manager: &TimeManager,
host_world_events: &mut HostWorldEvents<E>,
) -> bool {
if host_world_events.has_events() || self.base.message_manager.has_outgoing_messages() {
let writer = self.write_packet(
protocol,
now,
world,
global_world_manager,
time_manager,
host_world_events,
);
if io.send_packet(&self.address, writer.to_packet()).is_err() {
warn!("Server Error: Cannot send data packet to {}", &self.address);
}
return true;
}
false
}
fn write_packet<W: WorldRefType<E>>(
&mut self,
protocol: &Protocol,
now: &Instant,
world: &W,
global_world_manager: &GlobalWorldManager<E>,
time_manager: &TimeManager,
host_world_events: &mut HostWorldEvents<E>,
) -> BitWriter {
let next_packet_index = self.base.next_packet_index();
let mut writer = BitWriter::new();
writer.reserve_bits(3);
self.base.write_header(PacketType::Data, &mut writer);
let tick = time_manager.current_tick();
tick.ser(&mut writer);
time_manager.current_tick_instant().ser(&mut writer);
let mut has_written = false;
self.base.write_packet(
&protocol,
now,
&mut writer,
next_packet_index,
world,
global_world_manager,
&mut has_written,
true,
host_world_events,
);
writer
}
}