use std::{
collections::{HashMap, HashSet, VecDeque},
hash::Hash,
net::SocketAddr,
time::Duration,
};
#[cfg(feature = "bench_instrumentation")]
pub mod bench_take_events_counters {
use std::sync::atomic::{AtomicU64, Ordering};
#[doc(hidden)] pub static NS_HOST_REMOTE_COMMANDS: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static NS_SENDER_COLLECT: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static NS_TAKE_UPDATE_EVENTS: AtomicU64 = AtomicU64::new(0);
pub fn reset() {
NS_HOST_REMOTE_COMMANDS.store(0, Ordering::Relaxed);
NS_SENDER_COLLECT.store(0, Ordering::Relaxed);
NS_TAKE_UPDATE_EVENTS.store(0, Ordering::Relaxed);
}
pub fn snapshot() -> (u64, u64, u64) {
(
NS_HOST_REMOTE_COMMANDS.load(Ordering::Relaxed),
NS_SENDER_COLLECT.load(Ordering::Relaxed),
NS_TAKE_UPDATE_EVENTS.load(Ordering::Relaxed),
)
}
}
#[cfg(feature = "bench_instrumentation")]
pub mod cmd_emission_counters {
use std::sync::atomic::{AtomicU64, Ordering};
#[doc(hidden)] pub static SPAWN: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static SPAWN_WITH_COMPONENTS: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static DESPAWN: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static INSERT_COMPONENT: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static REMOVE_COMPONENT: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static NOOP: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static OTHER: AtomicU64 = AtomicU64::new(0);
pub static PAYLOAD_COMPONENTS: AtomicU64 = AtomicU64::new(0);
pub fn reset() {
SPAWN.store(0, Ordering::Relaxed);
SPAWN_WITH_COMPONENTS.store(0, Ordering::Relaxed);
DESPAWN.store(0, Ordering::Relaxed);
INSERT_COMPONENT.store(0, Ordering::Relaxed);
REMOVE_COMPONENT.store(0, Ordering::Relaxed);
NOOP.store(0, Ordering::Relaxed);
OTHER.store(0, Ordering::Relaxed);
PAYLOAD_COMPONENTS.store(0, Ordering::Relaxed);
}
#[derive(Debug, Clone, Copy)]
pub struct CmdEmissionSnapshot {
pub spawn: u64,
pub spawn_with_components: u64,
pub despawn: u64,
pub insert_component: u64,
pub remove_component: u64,
pub noop: u64,
pub other: u64,
pub payload_components: u64,
}
pub fn snapshot() -> CmdEmissionSnapshot {
CmdEmissionSnapshot {
spawn: SPAWN.load(Ordering::Relaxed),
spawn_with_components: SPAWN_WITH_COMPONENTS.load(Ordering::Relaxed),
despawn: DESPAWN.load(Ordering::Relaxed),
insert_component: INSERT_COMPONENT.load(Ordering::Relaxed),
remove_component: REMOVE_COMPONENT.load(Ordering::Relaxed),
noop: NOOP.load(Ordering::Relaxed),
other: OTHER.load(Ordering::Relaxed),
payload_components: PAYLOAD_COMPONENTS.load(Ordering::Relaxed),
}
}
}
use log::info;
use naia_socket_shared::Instant;
use crate::world::sync::RemoteEntityChannel;
use crate::world::update::entity_update_manager::EntityUpdateManager;
use crate::{
messages::channels::receivers::reliable_receiver::ReliableReceiver,
sequence_list::SequenceList,
types::{HostType, PacketIndex},
world::{
entity::entity_converters::GlobalWorldManagerType,
host::host_world_manager::{CommandId, HostWorldManager},
remote::remote_entity_waitlist::{RemoteEntityWaitlist, WaitlistStore},
sync::HostEntityChannel,
},
ChannelSender, ComponentKind, ComponentKinds, ComponentUpdate, DiffMask,
EntityAndGlobalEntityConverter, EntityAuthStatus, EntityCommand, EntityConverterMut,
EntityEvent, EntityMessage, EntityMessageType, GlobalEntity, GlobalEntitySpawner, HostEntity,
InScopeEntities, LocalEntityAndGlobalEntityConverter, LocalEntityMap, MessageIndex,
OwnedLocalEntity, PacketNotifiable, ReliableSender, RemoteEntity, RemoteWorldManager,
Replicate, Tick, WorldMutType, WorldRefType,
};
cfg_if! {
if #[cfg(feature = "e2e_debug")] {
use crate::world::{
host::host_world_manager::SubCommandId,
sync::remote_entity_channel::EntityChannelState,
};
}
}
const RESEND_COMMAND_RTT_FACTOR: f32 = 1.5;
const COMMAND_RECORD_TTL: Duration = Duration::from_secs(60);
type SentCommandPackets = SequenceList<(Instant, Vec<(CommandId, EntityMessage<OwnedLocalEntity>)>)>;
type OutgoingEvents = (VecDeque<(CommandId, EntityCommand)>, HashMap<GlobalEntity, HashSet<ComponentKind>>);
pub struct LocalWorldManager {
entity_map: LocalEntityMap,
sender: ReliableSender<EntityCommand>,
sent_command_packets: SentCommandPackets,
receiver: ReliableReceiver<EntityMessage<OwnedLocalEntity>>,
host: HostWorldManager,
remote: RemoteWorldManager,
updater: EntityUpdateManager,
paused_entities: HashSet<GlobalEntity>,
incoming_components: HashMap<(OwnedLocalEntity, ComponentKind), Box<dyn Replicate>>,
incoming_updates: Vec<(Tick, OwnedLocalEntity, ComponentUpdate)>,
}
impl LocalWorldManager {
pub fn new(
address: &Option<SocketAddr>,
host_type: HostType,
user_key: u64,
global_world_manager: &dyn GlobalWorldManagerType,
) -> Self {
Self {
entity_map: LocalEntityMap::new(host_type),
sender: ReliableSender::new(RESEND_COMMAND_RTT_FACTOR, None),
sent_command_packets: SequenceList::new(),
receiver: ReliableReceiver::new(),
host: HostWorldManager::new(host_type, user_key),
remote: RemoteWorldManager::new(host_type),
updater: EntityUpdateManager::new(address, global_world_manager),
paused_entities: HashSet::new(),
incoming_components: HashMap::new(),
incoming_updates: Vec::new(),
}
}
pub(crate) fn entity_waitlist_queue<T>(
&mut self,
remote_entity_set: &HashSet<RemoteEntity>,
waitlist_store: &mut WaitlistStore<T>,
message: T,
) {
self.remote
.entity_waitlist_queue(remote_entity_set, waitlist_store, message);
}
pub fn entity_converter(&self) -> &dyn LocalEntityAndGlobalEntityConverter {
self.entity_map.entity_converter()
}
pub fn entity_converter_mut<'a, 'b>(
&'b mut self,
global_world_manager: &'a dyn GlobalWorldManagerType,
) -> EntityConverterMut<'a, 'b> {
self.host
.entity_converter_mut(global_world_manager, &mut self.entity_map)
}
pub fn has_global_entity(&self, global_entity: &GlobalEntity) -> bool {
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
return false;
};
self.has_local_entity(&local_entity)
}
pub fn has_local_entity(&self, local_entity: &OwnedLocalEntity) -> bool {
match local_entity {
OwnedLocalEntity::Host { id, is_static: true } => {
self.host.has_entity(&HostEntity::new_static(*id))
}
OwnedLocalEntity::Host { id, is_static: false } => {
self.host.has_entity(&HostEntity::new(*id))
}
OwnedLocalEntity::Remote { id, is_static } => {
let remote = if *is_static { RemoteEntity::new_static(*id) } else { RemoteEntity::new(*id) };
self.remote.has_entity(&remote)
}
}
}
pub fn get_host_entity_channel(
&self,
entity: &HostEntity,
) -> Option<&crate::world::sync::HostEntityChannel> {
self.host.get_entity_channel(entity)
}
pub fn get_host_entity_channel_mut(
&mut self,
entity: &HostEntity,
) -> Option<&mut crate::world::sync::HostEntityChannel> {
self.host.get_entity_channel_mut(entity)
}
pub fn has_host_entity(&self, host_entity: &HostEntity) -> bool {
self.host.has_entity(host_entity)
}
pub fn host_init_entity(
&mut self,
global_entity: &GlobalEntity,
component_kinds: Vec<ComponentKind>,
component_kinds_map: &ComponentKinds,
is_static: bool,
) {
if let Ok(existing_host_entity) =
self.entity_map.global_entity_to_host_entity(global_entity)
{
let channel_alive = self
.host
.get_host_world()
.contains_key(&existing_host_entity);
if !channel_alive {
self.entity_map.remove_by_global_entity(global_entity);
}
}
if self
.entity_map
.global_entity_to_host_entity(global_entity)
.is_err()
{
if is_static {
let host_entity = self.host.host_generate_static_entity();
self.entity_map
.insert_with_static_host_entity(*global_entity, host_entity);
} else {
let host_entity = self.host.host_generate_entity();
self.entity_map
.insert_with_host_entity(*global_entity, host_entity);
}
}
if is_static {
self.host.init_static_entity_send_host_commands(
&self.entity_map,
global_entity,
component_kinds,
);
} else {
self.host.init_entity_send_host_commands(
&self.entity_map,
global_entity,
component_kinds,
&mut self.updater,
component_kinds_map,
);
}
}
pub fn migrate_entity_remote_to_host(
&mut self,
global_entity: &GlobalEntity,
) -> Result<HostEntity, String> {
let Some(local_entity_record) = self.entity_map.remove_by_global_entity(global_entity)
else {
return Err(format!(
"Entity does not exist in local entity map: {:?}",
global_entity
));
};
if !local_entity_record.is_remote_owned() {
self.entity_map
.insert_with_remote_entity(*global_entity, local_entity_record.remote_entity());
return Err(format!("Entity is not remote-owned: {:?}", global_entity));
}
let old_remote_entity = local_entity_record.remote_entity();
let new_host_entity = self.host.host_generate_entity();
self.entity_map
.insert_with_host_entity(*global_entity, new_host_entity);
self.entity_map
.remove_remote_mapping_if_exists(&old_remote_entity);
debug_assert!(
self.entity_map
.entity_converter()
.global_entity_to_remote_entity(global_entity)
.is_err(),
"After migration, global_entity_to_remote_entity must fail for migrated entity"
);
self.remote.force_drain_entity_buffers(&old_remote_entity);
let component_kinds = self.remote.extract_component_kinds(&old_remote_entity);
let _old_remote_channel = self.remote.remove_entity_channel(&old_remote_entity);
let new_host_channel =
HostEntityChannel::new_with_components(self.entity_map.host_type(), component_kinds);
self.host
.insert_entity_channel(new_host_entity, new_host_channel);
let old_entity = old_remote_entity.copy_to_owned();
let new_entity = OwnedLocalEntity::Host { id: new_host_entity.value(), is_static: false };
self.entity_map
.install_entity_redirect(old_entity, new_entity);
self.update_sent_command_entity_refs(global_entity, old_entity, new_entity);
self.remote
.despawn_entity(&mut self.entity_map, &old_remote_entity);
Ok(new_host_entity)
}
pub fn host_send_enable_delegation(&mut self, global_entity: &GlobalEntity) {
let command = EntityCommand::EnableDelegation(None, *global_entity);
self.host.send_command(&self.entity_map, command);
}
pub fn host_local_enable_delegation(&mut self, host_entity: &HostEntity) {
let Some(channel) = self.host.get_entity_channel_mut(host_entity) else {
panic!(
"Cannot enable delegation on non-existent HostEntity: {:?}",
host_entity
);
};
channel.local_enable_delegation();
}
pub fn host_send_migrate_response(
&mut self,
global_entity: &GlobalEntity,
old_remote_entity: &RemoteEntity, new_host_entity: &HostEntity, ) {
let command = EntityCommand::MigrateResponse(
None,
*global_entity,
*old_remote_entity,
*new_host_entity,
);
self.host.send_command(&self.entity_map, command);
}
#[track_caller]
pub fn host_send_set_auth(
&mut self,
global_entity: &GlobalEntity,
auth_status: EntityAuthStatus,
) {
#[cfg(feature = "e2e_debug")]
{
crate::e2e_trace!(
"[SERVER_SEND] SetAuthority entity={:?} status={:?}",
global_entity,
auth_status
);
}
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!("Attempting to send SetAuthority for entity which does not exist in local entity map! {:?}", global_entity);
};
let command = EntityCommand::SetAuthority(None, *global_entity, auth_status);
if local_entity.is_host() {
self.host.send_command(&self.entity_map, command);
} else {
self.remote
.send_auth_command(self.entity_map.entity_converter(), command);
}
}
pub fn host_reserve_entity(&mut self, global_entity: &GlobalEntity) -> HostEntity {
self.host
.host_reserve_entity(&mut self.entity_map, global_entity)
}
pub fn host_remove_reserved_entity(
&mut self,
global_entity: &GlobalEntity,
) -> Option<HostEntity> {
self.host.host_removed_reserved_entity(global_entity)
}
pub(crate) fn insert_sent_command_packet(&mut self, packet_index: &PacketIndex, now: Instant) {
if !self
.sent_command_packets
.contains_scan_from_back(packet_index)
{
self.sent_command_packets
.insert_scan_from_back(*packet_index, (now, Vec::new()));
}
}
pub(crate) fn record_command_written(
&mut self,
packet_index: &PacketIndex,
command_id: &CommandId,
message: EntityMessage<OwnedLocalEntity>,
) {
#[cfg(feature = "bench_instrumentation")]
{
use std::sync::atomic::Ordering;
match &message {
EntityMessage::Spawn(_) => {
cmd_emission_counters::SPAWN.fetch_add(1, Ordering::Relaxed);
}
EntityMessage::SpawnWithComponents(_, kinds) => {
cmd_emission_counters::SPAWN_WITH_COMPONENTS
.fetch_add(1, Ordering::Relaxed);
cmd_emission_counters::PAYLOAD_COMPONENTS
.fetch_add(kinds.len() as u64, Ordering::Relaxed);
}
EntityMessage::Despawn(_) => {
cmd_emission_counters::DESPAWN.fetch_add(1, Ordering::Relaxed);
}
EntityMessage::InsertComponent(_, _) => {
cmd_emission_counters::INSERT_COMPONENT.fetch_add(1, Ordering::Relaxed);
}
EntityMessage::RemoveComponent(_, _) => {
cmd_emission_counters::REMOVE_COMPONENT.fetch_add(1, Ordering::Relaxed);
}
EntityMessage::Noop => {
cmd_emission_counters::NOOP.fetch_add(1, Ordering::Relaxed);
}
_ => {
cmd_emission_counters::OTHER.fetch_add(1, Ordering::Relaxed);
}
}
}
let (_, sent_actions_list) = self
.sent_command_packets
.get_mut_scan_from_back(packet_index)
.unwrap();
sent_actions_list.push((*command_id, message));
}
pub fn remote_entities(&self) -> Vec<GlobalEntity> {
self.entity_map.remote_entities()
}
#[cfg(feature = "e2e_debug")]
pub fn debug_remote_channel_diagnostic(
&self,
remote_entity: &RemoteEntity,
) -> Option<(
EntityChannelState,
(SubCommandId, usize, Option<SubCommandId>, usize),
)> {
self.remote.debug_channel_diagnostic(remote_entity)
}
#[cfg(feature = "e2e_debug")]
pub fn debug_remote_channel_snapshot(
&self,
remote_entity: &RemoteEntity,
) -> Option<(
EntityChannelState,
Option<MessageIndex>,
usize,
Option<(MessageIndex, EntityMessageType)>,
Option<MessageIndex>,
)> {
self.remote.debug_channel_snapshot(remote_entity)
}
pub fn send_enable_delegation_response(&mut self, global_entity: &GlobalEntity) {
let command = EntityCommand::EnableDelegationResponse(None, *global_entity);
self.remote.send_auth_command(&self.entity_map, command);
}
pub fn remote_send_request_auth(&mut self, global_entity: &GlobalEntity) {
let command = EntityCommand::RequestAuthority(None, *global_entity);
self.remote.send_auth_command(&self.entity_map, command);
}
pub fn remote_receive_set_auth(
&mut self,
global_entity: &GlobalEntity,
auth_status: EntityAuthStatus,
) {
let remote_entity = self
.entity_map
.entity_converter()
.global_entity_to_remote_entity(global_entity)
.unwrap();
self.remote
.receive_set_auth_status(remote_entity, auth_status);
}
pub fn get_remote_entity_auth_status(
&self,
global_entity: &GlobalEntity,
) -> Option<EntityAuthStatus> {
let Ok(owned) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
return None;
};
let OwnedLocalEntity::Remote { .. } = owned else {
return None;
};
self.remote
.get_entity_auth_status(&owned.take_remote())
}
pub fn entity_waitlist_mut(&mut self) -> &mut RemoteEntityWaitlist {
self.remote.entity_waitlist_mut()
}
pub fn receiver_buffer_message(
&mut self,
id: MessageIndex,
msg: EntityMessage<OwnedLocalEntity>,
) {
self.receiver.buffer_message(id, msg);
}
pub(crate) fn insert_received_component(
&mut self,
local_entity: &OwnedLocalEntity,
component_kind: &ComponentKind,
component: Box<dyn Replicate>,
) {
self.incoming_components
.insert((*local_entity, *component_kind), component);
}
pub(crate) fn insert_received_update(
&mut self,
tick: Tick,
local_entity: &OwnedLocalEntity,
component_update: ComponentUpdate,
) {
self.incoming_updates
.push((tick, *local_entity, component_update));
}
pub fn take_incoming_events<E: Copy + Eq + Hash + Send + Sync, W: WorldMutType<E>>(
&mut self,
spawner: &mut dyn GlobalEntitySpawner<E>,
global_world_manager: &dyn GlobalWorldManagerType,
component_kinds: &ComponentKinds,
world: &mut W,
now: &Instant,
) -> Vec<EntityEvent> {
let incoming_messages = self.receiver.receive_messages();
let mut incoming_host_messages = Vec::new();
let mut incoming_remote_messages = Vec::new();
for (id, incoming_message) in incoming_messages {
if incoming_message.get_type() == EntityMessageType::Noop {
continue; }
let Some(local_entity) = incoming_message.entity() else {
panic!(
"Received message without an entity! Message: {:?}",
incoming_message
);
};
match local_entity {
OwnedLocalEntity::Host { id: host_entity, is_static } => {
let host_entity = if is_static { HostEntity::new_static(host_entity) } else { HostEntity::new(host_entity) };
incoming_host_messages.push((id, incoming_message.with_entity(host_entity)));
}
OwnedLocalEntity::Remote { .. } => {
let remote_entity = local_entity.take_remote();
#[cfg(feature = "e2e_debug")]
if incoming_message.get_type() == EntityMessageType::Spawn {
extern "Rust" {
fn client_routed_remote_spawn_increment();
}
unsafe {
client_routed_remote_spawn_increment();
}
}
incoming_remote_messages
.push((id, incoming_message.with_entity(remote_entity)));
}
}
}
let host_events = self.host.take_incoming_events(
spawner,
global_world_manager,
&self.entity_map,
world,
incoming_host_messages,
);
let mut remote_events = self.remote.take_incoming_events(
spawner,
global_world_manager,
&mut self.entity_map,
component_kinds,
world,
now,
&mut self.incoming_components,
std::mem::take(&mut self.incoming_updates),
incoming_remote_messages,
);
let mut incoming_events = host_events;
incoming_events.append(&mut remote_events);
incoming_events
}
pub fn register_authed_entity(
&mut self,
global_manager: &dyn GlobalWorldManagerType,
global_entity: &GlobalEntity,
) {
if let Ok(remote_entity) = self
.entity_map
.global_entity_to_remote_entity(global_entity)
{
self.remote.register_authed_entity(&remote_entity);
}
let Some(component_kinds) = global_manager.component_kinds(global_entity) else {
return;
};
for component_kind in component_kinds.iter() {
self.updater
.register_component(global_entity, component_kind);
}
}
pub fn deregister_authed_entity(
&mut self,
global_manager: &dyn GlobalWorldManagerType,
global_entity: &GlobalEntity,
) {
if let Ok(remote_entity) = self
.entity_map
.global_entity_to_remote_entity(global_entity)
{
self.remote.deregister_authed_entity(&remote_entity);
}
let Some(component_kinds) = global_manager.component_kinds(global_entity) else {
return;
};
for component_kind in component_kinds.iter() {
self.updater
.deregister_component(global_entity, component_kind);
}
}
pub fn remote_spawn_entity(&mut self, global_entity: &GlobalEntity) {
let remote_entity = self
.entity_map
.global_entity_to_remote_entity(global_entity)
.unwrap();
self.remote.spawn_entity(&remote_entity);
}
pub fn remote_despawn_entity(&mut self, global_entity: &GlobalEntity) {
let remote_entity = self
.entity_map
.global_entity_to_remote_entity(global_entity)
.unwrap();
self.remote
.despawn_entity(&mut self.entity_map, &remote_entity);
}
pub(crate) fn get_diff_mask(
&self,
global_entity: &GlobalEntity,
component_kind: &ComponentKind,
) -> DiffMask {
self.updater.get_diff_mask(global_entity, component_kind)
}
pub(crate) fn record_update(
&mut self,
now: &Instant,
packet_index: &PacketIndex,
global_entity: &GlobalEntity,
component_kind: &ComponentKind,
diff_mask: DiffMask,
) {
self.updater
.record_update(now, packet_index, global_entity, component_kind, diff_mask);
}
pub fn despawn_entity(&mut self, global_entity: &GlobalEntity) {
self.paused_entities.remove(global_entity);
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!(
"Attempting to despawn entity which does not exist in local entity map! {:?}",
global_entity
);
};
if local_entity.is_host() {
self.host
.send_command(&self.entity_map, EntityCommand::Despawn(*global_entity));
} else {
self.remote
.send_entity_command(&self.entity_map, EntityCommand::Despawn(*global_entity));
}
}
pub fn pause_entity(&mut self, global_entity: &GlobalEntity) {
self.paused_entities.insert(*global_entity);
}
pub fn resume_entity(&mut self, global_entity: &GlobalEntity) {
self.paused_entities.remove(global_entity);
}
pub fn is_entity_paused(&self, global_entity: &GlobalEntity) -> bool {
self.paused_entities.contains(global_entity)
}
pub fn insert_component(
&mut self,
global_entity: &GlobalEntity,
component_kind: &ComponentKind,
) {
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!("Attempting to insert component for entity which does not exist in local entity map! {:?}", global_entity);
};
if local_entity.is_host() {
self.updater
.register_component(global_entity, component_kind);
self.host.send_command(
&self.entity_map,
EntityCommand::InsertComponent(*global_entity, *component_kind),
);
} else {
self.remote.send_entity_command(
&self.entity_map,
EntityCommand::InsertComponent(*global_entity, *component_kind),
);
}
}
pub fn remove_component(
&mut self,
global_entity: &GlobalEntity,
component_kind: &ComponentKind,
) {
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!("Attempting to remove component for entity which does not exist in local entity map! {:?}", global_entity);
};
if local_entity.is_host() {
self.host.send_command(
&self.entity_map,
EntityCommand::RemoveComponent(*global_entity, *component_kind),
);
} else {
self.remote.send_entity_command(
&self.entity_map,
EntityCommand::RemoveComponent(*global_entity, *component_kind),
);
}
}
pub fn send_publish(&mut self, host_type: HostType, global_entity: &GlobalEntity) {
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!(
"Attempting to publish entity which does not exist in local entity map! {:?}",
global_entity
);
};
let host_owned = match (host_type, local_entity.is_host()) {
(HostType::Server, true) => {
panic!("Server-owned Entities are published by default, invalid!")
}
(HostType::Client, false) => {
panic!("Server-owned Entities are published by default, invalid!")
}
(HostType::Server, false) => false, (HostType::Client, true) => true, };
let command = EntityCommand::Publish(None, *global_entity);
if host_owned {
self.host.send_command(&self.entity_map, command);
} else {
self.remote
.send_auth_command(self.entity_map.entity_converter(), command);
}
}
pub fn send_unpublish(&mut self, host_type: HostType, global_entity: &GlobalEntity) {
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!(
"Attempting to publish entity which does not exist in local entity map! {:?}",
global_entity
);
};
let host_owned = match (host_type, local_entity.is_host()) {
(HostType::Server, true) => panic!("Server-owned Entities cannot be unpublished!"),
(HostType::Client, false) => panic!("Server-owned Entities cannot be unpublished!"),
(HostType::Server, false) => false, (HostType::Client, true) => true, };
let command = EntityCommand::Unpublish(None, *global_entity);
if host_owned {
self.host.send_command(&self.entity_map, command);
} else {
self.remote
.send_auth_command(self.entity_map.entity_converter(), command);
}
}
pub fn send_enable_delegation(
&mut self,
host_type: HostType,
origin_is_owning_client: bool,
global_entity: &GlobalEntity,
) {
let Ok(local_entity) = self.entity_map.global_entity_to_owned_entity(global_entity) else {
panic!("Attempting to enable delegation for entity which does not exist in local entity map! {:?}", global_entity);
};
let host_owned = match (host_type, local_entity.is_host(), origin_is_owning_client) {
(HostType::Server, false, true) => {
panic!("Client cannot originate enable delegation for ANOTHER client-owned entity!")
}
(HostType::Client, _, false) => {
panic!("Client must be the owning client to enable delegation!")
}
(HostType::Client, false, true) => {
panic!("Client cannot enable delegation for a Server-owned entity")
}
(HostType::Server, true, true) => true, (HostType::Server, true, false) => true, (HostType::Client, true, true) => true, (HostType::Server, false, false) => false, };
if host_owned {
let host_entity = self
.entity_map
.global_entity_to_host_entity(global_entity)
.expect("Host entity should exist");
let is_published = if let Some(channel) = self.get_host_entity_channel(&host_entity) {
use crate::world::sync::auth_channel::EntityAuthChannelState;
let state = channel.auth_channel_state();
state == EntityAuthChannelState::Published
|| state == EntityAuthChannelState::Delegated
} else {
false
};
if !is_published {
let publish_command = EntityCommand::Publish(None, *global_entity);
self.host.send_command(&self.entity_map, publish_command);
}
#[cfg(feature = "e2e_debug")]
crate::e2e_trace!(
"[SERVER_SEND] EnableDelegation entity={:?} callsite=send_enable_delegation(host)",
global_entity
);
let enable_delegation_command = EntityCommand::EnableDelegation(None, *global_entity);
self.host
.send_command(&self.entity_map, enable_delegation_command);
} else {
#[cfg(feature = "e2e_debug")]
crate::e2e_trace!("[SERVER_SEND] EnableDelegation entity={:?} callsite=send_enable_delegation(remote)", global_entity);
let command = EntityCommand::EnableDelegation(None, *global_entity);
self.remote
.send_auth_command(self.entity_map.entity_converter(), command);
}
}
#[track_caller]
pub fn send_disable_delegation(&mut self, global_entity: &GlobalEntity) {
#[cfg(feature = "e2e_debug")]
{
let caller = std::panic::Location::caller();
crate::e2e_trace!(
"[SERVER_SEND] DisableDelegation entity={:?} caller={}:{}",
global_entity,
caller.file(),
caller.line()
);
}
let command = EntityCommand::DisableDelegation(None, *global_entity);
self.host.send_command(&self.entity_map, command);
}
pub fn remote_send_release_auth(&mut self, global_entity: &GlobalEntity) {
let command = EntityCommand::ReleaseAuthority(None, *global_entity);
let host_owned = self
.entity_map
.global_entity_to_owned_entity(global_entity)
.unwrap()
.is_host();
if host_owned {
self.host.send_command(&self.entity_map, command);
} else {
self.remote
.send_auth_command(self.entity_map.entity_converter(), command);
}
}
pub fn collect_messages(&mut self, now: &Instant, rtt_millis: &f32) {
self.handle_dropped_command_packets(now);
self.updater.handle_dropped_update_packets(now, rtt_millis);
}
fn handle_dropped_command_packets(&mut self, now: &Instant) {
while let Some((_, (time_sent, _))) = self.sent_command_packets.front() {
if time_sent.elapsed(now) > COMMAND_RECORD_TTL {
self.sent_command_packets.pop_front();
} else {
break;
}
}
self.entity_map.cleanup_old_redirects(now, 60);
}
pub fn take_outgoing_events<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
&mut self,
now: &Instant,
rtt_millis: &f32,
world: &W,
converter: &dyn EntityAndGlobalEntityConverter<E>,
global_world_manager: &dyn GlobalWorldManagerType,
) -> OutgoingEvents {
#[cfg(feature = "bench_instrumentation")]
let t = std::time::Instant::now();
let host_commands = self.host.take_outgoing_commands();
let remote_commands = self.remote.take_outgoing_commands();
for commands in [host_commands, remote_commands] {
for command in commands {
self.sender.send_message(command);
}
}
#[cfg(feature = "bench_instrumentation")]
{
use std::sync::atomic::Ordering;
bench_take_events_counters::NS_HOST_REMOTE_COMMANDS
.fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
}
#[cfg(feature = "bench_instrumentation")]
let t = std::time::Instant::now();
self.sender.collect_messages(now, rtt_millis);
let world_commands = self.sender.take_next_messages();
#[cfg(feature = "bench_instrumentation")]
{
use std::sync::atomic::Ordering;
bench_take_events_counters::NS_SENDER_COLLECT
.fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
}
#[cfg(feature = "bench_instrumentation")]
let t = std::time::Instant::now();
let update_events = self.take_update_events(world, converter, global_world_manager);
#[cfg(feature = "bench_instrumentation")]
{
use std::sync::atomic::Ordering;
bench_take_events_counters::NS_TAKE_UPDATE_EVENTS
.fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
}
(world_commands, update_events)
}
pub fn process_delivered_commands(&mut self) {
self.host
.process_delivered_commands(&mut self.entity_map, &mut self.updater);
}
pub fn take_update_events<E: Copy + Eq + Hash + Send + Sync, W: WorldRefType<E>>(
&mut self,
world: &W,
world_converter: &dyn EntityAndGlobalEntityConverter<E>,
global_world_manager: &dyn GlobalWorldManagerType,
) -> HashMap<GlobalEntity, HashSet<ComponentKind>> {
let dirty = self.updater.build_dirty_candidates_from_receivers();
let local_converter = self.entity_map.entity_converter();
let mut updatable_world: HashMap<GlobalEntity, HashSet<ComponentKind>> = HashMap::new();
for (global_entity, kinds) in dirty {
if self.paused_entities.contains(&global_entity) {
continue;
}
for kind in kinds {
if self.host.is_component_updatable(local_converter, &global_entity, &kind)
|| self.remote.is_component_updatable(local_converter, &global_entity, &kind)
{
updatable_world.entry(global_entity).or_default().insert(kind);
}
}
}
self.updater.take_outgoing_events(world, world_converter, global_world_manager, updatable_world)
}
pub fn get_message_processor_helpers(
&mut self,
) -> (
&dyn LocalEntityAndGlobalEntityConverter,
&mut RemoteEntityWaitlist,
) {
let entity_converter = self.entity_map.entity_converter();
let entity_waitlist = self.remote.entity_waitlist_mut();
(entity_converter, entity_waitlist)
}
fn host_notify_packet_delivered(&mut self, packet_index: PacketIndex) {
if let Some((_, command_list)) = self
.sent_command_packets
.remove_scan_from_front(&packet_index)
{
for (command_id, command) in command_list {
if self.sender.deliver_message(&command_id).is_some() {
self.deliver_message(command_id, command);
}
}
}
}
fn deliver_message(&mut self, id: CommandId, msg: EntityMessage<OwnedLocalEntity>) {
if msg.is_noop() {
return;
}
let Some(local_entity) = msg.entity() else {
panic!("Delivered message without an entity! Message: {:?}", msg);
};
match local_entity {
OwnedLocalEntity::Host { id: host_entity, is_static } => {
let host_entity = if is_static { HostEntity::new_static(host_entity) } else { HostEntity::new(host_entity) };
self.host.deliver_message(id, msg.with_entity(host_entity));
}
OwnedLocalEntity::Remote { .. } => {
let remote_entity = local_entity.take_remote();
self.remote
.deliver_message(id, msg.with_entity(remote_entity));
}
}
}
pub fn update_sent_command_entity_refs(
&mut self,
_global_entity: &GlobalEntity,
old_entity: OwnedLocalEntity,
new_entity: OwnedLocalEntity,
) {
for (_, (_, commands)) in self.sent_command_packets.iter_mut() {
for (_, message) in commands.iter_mut() {
if let Some(entity) = message.entity() {
if entity == old_entity {
*message = message.clone().with_entity(new_entity);
}
}
}
}
}
pub fn extract_host_entity_commands(
&mut self,
global_entity: &GlobalEntity,
) -> Vec<EntityCommand> {
let host_entity = self
.entity_map
.global_entity_to_host_entity(global_entity)
.unwrap();
self.host.extract_entity_commands(&host_entity)
}
pub fn extract_host_component_kinds(
&self,
global_entity: &GlobalEntity,
) -> HashSet<ComponentKind> {
let host_entity = self
.entity_map
.global_entity_to_host_entity(global_entity)
.unwrap();
let channel = self.host.get_entity_channel(&host_entity).unwrap();
channel.component_kinds().clone()
}
pub fn remove_host_entity(&mut self, global_entity: &GlobalEntity) {
let host_entity = self
.entity_map
.global_entity_to_host_entity(global_entity)
.unwrap();
self.host.remove_entity_channel(&host_entity);
self.entity_map.remove_by_global_entity(global_entity);
}
pub fn insert_remote_entity(
&mut self,
global_entity: &GlobalEntity,
remote_entity: RemoteEntity,
component_kinds: HashSet<ComponentKind>,
) {
self.entity_map
.insert_with_remote_entity(*global_entity, remote_entity);
if self.remote.has_entity_channel(&remote_entity) {
info!(
"RemoteEntity({:?}) channel already exists (likely from out-of-order SetAuthority). Upgrading to Delegated.",
remote_entity
);
let channel = self.remote.get_entity_channel_mut(&remote_entity).unwrap();
channel.configure_as_delegated();
channel.set_spawned(0);
for component_kind in component_kinds {
channel.insert_component_channel_as_inserted(component_kind, 0);
}
} else {
let mut channel = RemoteEntityChannel::new_delegated(self.entity_map.host_type());
channel.set_spawned(0);
for component_kind in component_kinds {
channel.insert_component_channel_as_inserted(component_kind, 0);
}
self.remote.insert_entity_channel(remote_entity, channel);
}
}
pub fn install_entity_redirect(&mut self, old: OwnedLocalEntity, new: OwnedLocalEntity) {
self.entity_map.install_entity_redirect(old, new);
}
pub fn apply_entity_redirect(&self, entity: OwnedLocalEntity) -> OwnedLocalEntity {
self.entity_map.apply_entity_redirect(&entity)
}
pub fn replay_entity_command(&mut self, global_entity: &GlobalEntity, command: EntityCommand) {
let _remote_entity = self
.entity_map
.global_entity_to_remote_entity(global_entity)
.unwrap();
self.remote.send_entity_command(&self.entity_map, command);
}
}
impl PacketNotifiable for LocalWorldManager {
fn notify_packet_delivered(&mut self, packet_index: PacketIndex) {
self.host_notify_packet_delivered(packet_index);
self.updater.notify_packet_delivered(packet_index);
}
}
cfg_if! {
if #[cfg(feature = "interior_visibility")] {
use crate::LocalEntity;
impl LocalWorldManager {
pub fn local_entities(&self) -> Vec<LocalEntity> {
self.entity_map
.iter()
.map(|(_, record)| LocalEntity::from(record.owned_entity()))
.collect::<Vec<LocalEntity>>()
}
}
}
}
#[cfg(feature = "test_utils")]
impl LocalWorldManager {
#[doc(hidden)]
pub fn diff_handler_receiver_count(&self) -> usize {
self.updater.diff_handler_receiver_count()
}
#[doc(hidden)]
pub fn dirty_update_count(&self) -> usize {
self.updater.dirty_candidates_len()
}
}