use std::collections::{HashMap, HashSet, VecDeque};
use std::{hash::Hash, net::SocketAddr};
use log::warn;
use naia_shared::{
BaseConnection, BigMapKey, BitReader, BitWriter, ChannelKinds, ComponentKind, ComponentKinds,
ConnectionConfig, EntityAndGlobalEntityConverter, EntityCommand, EntityEvent, GlobalEntity,
GlobalEntitySpawner, HostType, Instant, MessageIndex, MessageKinds, OutgoingPriorityHook,
PacketType, Serde, SerdeErr, StandardHeader, Tick, Timer, WorldMutType, WorldRefType, MTU_SIZE_BYTES,
};
use crate::{
connection::{
io::Io, ping_config::PingConfig, ping_manager::PingManager,
tick_buffer_messages::TickBufferMessages, tick_buffer_receiver::TickBufferReceiver,
},
events::WorldEvents,
request::{GlobalRequestManager, GlobalResponseManager},
time_manager::TimeManager,
user::UserKey,
world::global_world_manager::GlobalWorldManager,
};
cfg_if! {
if #[cfg(feature = "e2e_debug")] {
use std::sync::atomic::Ordering;
use naia_shared::EntityAuthStatus;
use crate::server::world_server::SERVER_TX_FRAMES;
}
}
#[cfg(feature = "bench_instrumentation")]
pub mod bench_send_counters {
use std::sync::atomic::{AtomicU64, Ordering};
#[doc(hidden)] pub static NS_COLLECT_MESSAGES: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static NS_TAKE_OUTGOING_EVENTS: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static NS_SEND_PACKET_LOOP: AtomicU64 = AtomicU64::new(0);
pub fn reset() {
NS_COLLECT_MESSAGES.store(0, Ordering::Relaxed);
NS_TAKE_OUTGOING_EVENTS.store(0, Ordering::Relaxed);
NS_SEND_PACKET_LOOP.store(0, Ordering::Relaxed);
naia_shared::bench_take_events_counters::reset();
}
pub fn snapshot() -> (u64, u64, u64) {
(
NS_COLLECT_MESSAGES.load(Ordering::Relaxed),
NS_TAKE_OUTGOING_EVENTS.load(Ordering::Relaxed),
NS_SEND_PACKET_LOOP.load(Ordering::Relaxed),
)
}
}
pub struct Connection {
pub address: SocketAddr,
pub user_key: UserKey,
pub base: BaseConnection,
pub ping_manager: PingManager,
tick_buffer: TickBufferReceiver,
pub manual_disconnect: bool,
timeout_timer: Timer,
}
impl Connection {
pub fn new(
connection_config: &ConnectionConfig,
ping_config: &PingConfig,
user_address: &SocketAddr,
user_key: &UserKey,
channel_kinds: &ChannelKinds,
global_world_manager: &GlobalWorldManager,
) -> Self {
Self {
address: *user_address,
user_key: *user_key,
base: BaseConnection::new(
connection_config,
&Some(*user_address),
HostType::Server,
user_key.to_u64(),
channel_kinds,
global_world_manager,
),
ping_manager: PingManager::new(ping_config),
tick_buffer: TickBufferReceiver::new(channel_kinds),
manual_disconnect: false,
timeout_timer: Timer::new(connection_config.disconnection_timeout_duration),
}
}
pub fn should_drop(&self) -> bool {
self.timeout_timer.ringing()
}
pub fn process_incoming_header(&mut self, header: &StandardHeader) {
self.base.process_incoming_header(header, &mut []);
self.timeout_timer.reset();
}
#[cfg(feature = "test_utils")]
pub fn diff_handler_receiver_count(&self) -> usize {
self.base.world_manager.diff_handler_receiver_count()
}
#[cfg(feature = "test_utils")]
pub fn inject_tick_buffer_message(
&mut self,
channel_kind: &naia_shared::ChannelKind,
host_tick: &naia_shared::Tick,
message_tick: &naia_shared::Tick,
message: naia_shared::MessageContainer,
) -> bool {
self.tick_buffer
.inject_message(channel_kind, host_tick, message_tick, message)
}
#[allow(clippy::too_many_arguments)]
pub fn read_packet(
&mut self,
channel_kinds: &ChannelKinds,
message_kinds: &MessageKinds,
component_kinds: &ComponentKinds,
client_authoritative_entities: bool,
server_tick: Tick,
client_tick: Tick,
reader: &mut BitReader,
) -> Result<(), SerdeErr> {
self.tick_buffer.read_messages(
channel_kinds,
message_kinds,
&server_tick,
&client_tick,
self.base.world_manager.entity_converter(),
reader,
)?;
self.base.read_packet(
channel_kinds,
message_kinds,
component_kinds,
&client_tick,
client_authoritative_entities,
reader,
)?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn process_packets<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
&mut self,
message_kinds: &MessageKinds,
component_kinds: &ComponentKinds,
client_authoritative_entities: bool,
now: &Instant,
global_entity_map: &mut dyn GlobalEntitySpawner<E>,
global_world_manager: &mut GlobalWorldManager,
global_request_manager: &mut GlobalRequestManager,
global_response_manager: &mut GlobalResponseManager,
world: &mut W,
incoming_events: &mut WorldEvents<E>,
) -> Vec<EntityEvent> {
let (entity_converter, entity_waitlist) =
self.base.world_manager.get_message_processor_helpers();
let messages = self.base.message_manager.receive_messages(
message_kinds,
now,
entity_converter,
entity_waitlist,
);
for (channel_kind, messages) in messages {
for message in messages {
incoming_events.push_message(&self.user_key, &channel_kind, message);
}
}
let (requests, responses) = self.base.message_manager.receive_requests_and_responses();
for (channel_kind, requests) in requests {
for (local_response_id, request) in requests {
let global_response_id = global_response_manager.create_response_id(
&self.user_key,
&channel_kind,
&local_response_id,
);
incoming_events.push_request(
&self.user_key,
&channel_kind,
global_response_id,
request,
);
}
}
for (global_request_id, response) in responses {
global_request_manager.receive_response(&global_request_id, response);
}
if client_authoritative_entities {
self.base.world_manager.take_incoming_events(
global_entity_map,
global_world_manager,
component_kinds,
world,
now,
)
} else {
Vec::new()
}
}
pub fn tick_buffer_messages(&mut self, tick: &Tick, messages: &mut TickBufferMessages) {
let channel_messages = self.tick_buffer.receive_messages(tick);
for (channel_kind, received_messages) in channel_messages {
for message in received_messages {
messages.push_message(&self.user_key, &channel_kind, message);
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn send_packets<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
&mut self,
channel_kinds: &ChannelKinds,
message_kinds: &MessageKinds,
component_kinds: &ComponentKinds,
now: &Instant,
io: &mut Io,
world: &W,
converter: &dyn EntityAndGlobalEntityConverter<E>,
global_world_manager: &GlobalWorldManager,
time_manager: &TimeManager,
priority_hook: &mut dyn OutgoingPriorityHook,
) {
let rtt_millis = self.ping_manager.rtt_average;
#[cfg(feature = "bench_instrumentation")]
let t = std::time::Instant::now();
self.base.collect_messages(now, &rtt_millis);
#[cfg(feature = "bench_instrumentation")]
bench_send_counters::NS_COLLECT_MESSAGES
.fetch_add(t.elapsed().as_nanos() as u64, std::sync::atomic::Ordering::Relaxed);
#[cfg(feature = "bench_instrumentation")]
let t = std::time::Instant::now();
let (mut host_world_events, mut update_events) = self
.base
.world_manager
.take_outgoing_events(now, &rtt_millis, world, converter, global_world_manager);
#[cfg(feature = "bench_instrumentation")]
bench_send_counters::NS_TAKE_OUTGOING_EVENTS
.fetch_add(t.elapsed().as_nanos() as u64, std::sync::atomic::Ordering::Relaxed);
#[cfg(feature = "e2e_debug")]
{
use crate::server::world_server::{
SERVER_OUTGOING_CMDS_DRAINED_TOTAL, SERVER_WORLD_MSGS_DRAINED,
};
let total_drained = host_world_events.len();
if total_drained > 0 {
SERVER_OUTGOING_CMDS_DRAINED_TOTAL.fetch_add(total_drained, Ordering::Relaxed);
SERVER_WORLD_MSGS_DRAINED.fetch_add(total_drained, Ordering::Relaxed);
}
}
self.base.accumulate_bandwidth(now);
let initial_dirty: Vec<GlobalEntity> = update_events.keys().copied().collect();
let mut prioritized: Vec<(GlobalEntity, f32)> = initial_dirty
.iter()
.map(|e| (*e, priority_hook.advance(e)))
.collect();
prioritized.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
let entity_priority_order: Vec<GlobalEntity> =
prioritized.into_iter().map(|(e, _)| e).collect();
#[cfg(feature = "bench_instrumentation")]
let t = std::time::Instant::now();
let mut any_sent = false;
loop {
if self.send_packet(
channel_kinds,
message_kinds,
component_kinds,
now,
io,
world,
converter,
global_world_manager,
time_manager,
&mut host_world_events,
&mut update_events,
Some(&entity_priority_order),
) {
any_sent = true;
} else {
break;
}
}
if any_sent {
self.base.mark_sent();
}
let current_tick = time_manager.current_tick();
for entity in &initial_dirty {
if !update_events.contains_key(entity) {
priority_hook.reset_after_send(entity, current_tick as u32);
}
}
#[cfg(feature = "bench_instrumentation")]
bench_send_counters::NS_SEND_PACKET_LOOP
.fetch_add(t.elapsed().as_nanos() as u64, std::sync::atomic::Ordering::Relaxed);
}
#[allow(clippy::too_many_arguments)]
fn send_packet<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
&mut self,
channel_kinds: &ChannelKinds,
message_kinds: &MessageKinds,
component_kinds: &ComponentKinds,
now: &Instant,
io: &mut Io,
world: &W,
entity_converter: &dyn EntityAndGlobalEntityConverter<E>,
global_world_manager: &GlobalWorldManager,
time_manager: &TimeManager,
host_world_events: &mut VecDeque<(MessageIndex, EntityCommand)>,
update_events: &mut HashMap<GlobalEntity, HashSet<ComponentKind>>,
entity_priority_order: Option<&[GlobalEntity]>,
) -> bool {
let has_messages = self.base.message_manager.has_outgoing_messages();
let has_events = !host_world_events.is_empty() || !update_events.is_empty();
let needs_ack_only = self.base.take_should_send_empty_ack();
if needs_ack_only && !has_messages && !has_events {
let mut writer = BitWriter::new();
writer.reserve_bits(3);
let _header = self.base.write_header(PacketType::Data, &mut writer);
let tick = time_manager.current_tick();
tick.ser(&mut writer);
time_manager.current_tick_instant().ser(&mut writer);
false.ser(&mut writer); false.ser(&mut writer); false.ser(&mut writer);
if io.send_packet(&self.address, writer.to_packet()).is_err() {
warn!(
"Server Error: Cannot send ACK-only packet to {}",
&self.address
);
} else {
#[cfg(feature = "e2e_debug")]
{
SERVER_TX_FRAMES.fetch_add(1, Ordering::Relaxed);
}
}
return false;
}
if has_events || has_messages {
if !self.base.can_spend_bandwidth(MTU_SIZE_BYTES as u32) {
self.base.record_bandwidth_deferred();
return false;
}
let writer = self.write_packet(
channel_kinds,
message_kinds,
component_kinds,
now,
world,
entity_converter,
global_world_manager,
time_manager,
host_world_events,
update_events,
entity_priority_order,
);
let packet = writer.to_packet();
let packet_bytes = packet.slice().len() as u32;
if io.send_packet(&self.address, packet).is_err() {
warn!("Server Error: Cannot send data packet to {}", &self.address);
} else {
self.base.spend_bandwidth(packet_bytes);
#[cfg(feature = "e2e_debug")]
{
SERVER_TX_FRAMES.fetch_add(1, Ordering::Relaxed);
use crate::server::world_server::SERVER_WORLD_PKTS_SENT;
SERVER_WORLD_PKTS_SENT.fetch_add(1, Ordering::Relaxed);
}
}
return true;
}
false
}
#[allow(clippy::too_many_arguments)]
fn write_packet<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
&mut self,
channel_kinds: &ChannelKinds,
message_kinds: &MessageKinds,
component_kinds: &ComponentKinds,
now: &Instant,
world: &W,
entity_converter: &dyn EntityAndGlobalEntityConverter<E>,
global_world_manager: &GlobalWorldManager,
time_manager: &TimeManager,
host_world_events: &mut VecDeque<(MessageIndex, EntityCommand)>,
update_events: &mut HashMap<GlobalEntity, HashSet<ComponentKind>>,
entity_priority_order: Option<&[GlobalEntity]>,
) -> BitWriter {
let next_packet_index = self.base.next_packet_index();
let mut writer = BitWriter::new();
writer.reserve_bits(3);
self.base.write_header(PacketType::Data, &mut writer);
let tick = time_manager.current_tick();
tick.ser(&mut writer);
time_manager.current_tick_instant().ser(&mut writer);
let mut has_written = false;
#[cfg(feature = "e2e_debug")]
let set_auth_granted_before = host_world_events
.iter()
.filter(|(_, cmd)| {
if let EntityCommand::SetAuthority(_, _, status) = cmd {
*status == EntityAuthStatus::Granted
} else {
false
}
})
.count();
self.base.write_packet(
channel_kinds,
message_kinds,
component_kinds,
now,
&mut writer,
next_packet_index,
world,
entity_converter,
global_world_manager,
&mut has_written,
true,
host_world_events,
update_events,
entity_priority_order,
);
#[cfg(feature = "e2e_debug")]
{
let set_auth_granted_after = host_world_events
.iter()
.filter(|(_, cmd)| {
if let EntityCommand::SetAuthority(_, _, status) = cmd {
*status == EntityAuthStatus::Granted
} else {
false
}
})
.count();
let written_count = set_auth_granted_before - set_auth_granted_after;
if written_count > 0 {
use crate::server::world_server::SERVER_WROTE_SET_AUTH;
SERVER_WROTE_SET_AUTH.fetch_add(written_count, Ordering::Relaxed);
}
}
writer
}
pub fn process_received_commands(&mut self) {
self.base.world_manager.process_delivered_commands();
}
}