use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{Arc, RwLock},
time::Duration,
};
use log::warn;
use crate::{ComponentKind, DiffMask, GlobalEntity, GlobalWorldManagerType};
use crate::world::entity_index::{EntityIndex, KeyGenerator32};
use crate::world::update::global_diff_handler::GlobalDiffHandler;
use crate::world::update::mut_channel::{DirtyNotifier, DirtySet, MutReceiver};
const ENTITY_INDEX_RECYCLE_TIMEOUT: Duration = Duration::from_secs(2);
#[cfg(feature = "bench_instrumentation")]
pub mod dirty_scan_counters {
use std::sync::atomic::{AtomicU64, Ordering};
#[doc(hidden)] pub static SCAN_CALLS: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static RECEIVERS_VISITED: AtomicU64 = AtomicU64::new(0);
#[doc(hidden)] pub static DIRTY_RESULTS: AtomicU64 = AtomicU64::new(0);
pub fn reset() {
SCAN_CALLS.store(0, Ordering::Relaxed);
RECEIVERS_VISITED.store(0, Ordering::Relaxed);
DIRTY_RESULTS.store(0, Ordering::Relaxed);
}
pub fn snapshot() -> (u64, u64, u64) {
(
SCAN_CALLS.load(Ordering::Relaxed),
RECEIVERS_VISITED.load(Ordering::Relaxed),
DIRTY_RESULTS.load(Ordering::Relaxed),
)
}
}
#[derive(Clone)]
pub struct UserDiffHandler {
receivers: HashMap<(GlobalEntity, ComponentKind), MutReceiver>,
global_diff_handler: Arc<RwLock<GlobalDiffHandler>>,
entity_to_index: HashMap<GlobalEntity, EntityIndex>,
index_to_entity: Vec<Option<GlobalEntity>>,
components_per_entity: HashMap<EntityIndex, u32>,
key_gen: KeyGenerator32<EntityIndex>,
kinds_by_bit: Vec<Option<ComponentKind>>,
dirty_set: Arc<DirtySet>,
}
impl UserDiffHandler {
pub fn new(global_world_manager: &dyn GlobalWorldManagerType) -> Self {
let global_diff_handler = global_world_manager.diff_handler();
let kind_count = global_diff_handler
.read()
.map(|h| h.kind_count())
.unwrap_or(0);
Self {
receivers: HashMap::new(),
global_diff_handler,
entity_to_index: HashMap::new(),
index_to_entity: Vec::new(),
components_per_entity: HashMap::new(),
key_gen: KeyGenerator32::new(ENTITY_INDEX_RECYCLE_TIMEOUT),
kinds_by_bit: vec![None; kind_count as usize],
dirty_set: Arc::new(DirtySet::new(kind_count)),
}
}
fn allocate_entity_index(&mut self, entity: &GlobalEntity) -> EntityIndex {
if let Some(&idx) = self.entity_to_index.get(entity) {
return idx;
}
let idx = self.key_gen.generate();
let slot = idx.0 as usize;
if slot >= self.index_to_entity.len() {
self.index_to_entity.resize(slot + 1, None);
}
self.index_to_entity[slot] = Some(*entity);
self.entity_to_index.insert(*entity, idx);
self.dirty_set.ensure_capacity(slot);
idx
}
pub fn register_component(
&mut self,
address: &Option<SocketAddr>,
entity: &GlobalEntity,
component_kind: &ComponentKind,
) {
let Ok(global_handler) = self.global_diff_handler.as_ref().read() else {
panic!("Be sure you can get self.global_diff_handler before calling this!");
};
let Some(receiver) = global_handler.receiver(address, entity, component_kind) else {
#[cfg(feature = "e2e_debug")]
{
warn!(
"UserDiffHandler: Component {:?} for {:?} not yet registered in GlobalDiffHandler, skipping registration",
component_kind, entity
);
}
return;
};
let kind_bit = global_handler.kind_bit(component_kind);
drop(global_handler);
let Some(kind_bit) = kind_bit else {
warn!("UserDiffHandler: kind_bit unresolved for {:?}; notifier not attached", component_kind);
return;
};
let entity_idx = self.allocate_entity_index(entity);
let bit_idx = kind_bit as usize;
if bit_idx >= self.kinds_by_bit.len() {
self.kinds_by_bit.resize(bit_idx + 1, None);
}
if self.kinds_by_bit[bit_idx].is_none() {
self.kinds_by_bit[bit_idx] = Some(*component_kind);
}
receiver.attach_notifier(DirtyNotifier::new(
entity_idx,
kind_bit,
Arc::downgrade(&self.dirty_set),
));
self.receivers.insert((*entity, *component_kind), receiver);
*self.components_per_entity.entry(entity_idx).or_insert(0) += 1;
}
pub fn deregister_component(&mut self, entity: &GlobalEntity, component_kind: &ComponentKind) {
self.receivers.remove(&(*entity, *component_kind));
let Some(&entity_idx) = self.entity_to_index.get(entity) else {
return;
};
let Ok(global_handler) = self.global_diff_handler.as_ref().read() else {
return;
};
if let Some(kind_bit) = global_handler.kind_bit(component_kind) {
self.dirty_set.cancel(entity_idx, kind_bit);
}
drop(global_handler);
if let Some(count) = self.components_per_entity.get_mut(&entity_idx) {
*count = count.saturating_sub(1);
if *count == 0 {
self.components_per_entity.remove(&entity_idx);
self.entity_to_index.remove(entity);
if let Some(slot) = self.index_to_entity.get_mut(entity_idx.0 as usize) {
*slot = None;
}
self.key_gen.recycle_key(&entity_idx);
}
}
}
pub fn has_component(&self, entity: &GlobalEntity, component: &ComponentKind) -> bool {
self.receivers.contains_key(&(*entity, *component))
}
pub fn diff_mask_snapshot(
&self,
entity: &GlobalEntity,
component_kind: &ComponentKind,
) -> DiffMask {
let Some(receiver) = self.receivers.get(&(*entity, *component_kind)) else {
panic!("Should not call this unless we're sure there's a receiver");
};
receiver.mask_snapshot()
}
pub fn diff_mask_is_clear(
&self,
entity: &GlobalEntity,
component_kind: &ComponentKind,
) -> bool {
let Some(receiver) = self.receivers.get(&(*entity, *component_kind)) else {
warn!(
"diff_mask_is_clear(): Should not call this unless we're sure there's a receiver"
);
return true;
};
receiver.diff_mask_is_clear()
}
pub fn or_diff_mask(
&mut self,
entity: &GlobalEntity,
component_kind: &ComponentKind,
other_mask: &DiffMask,
) {
let Some(receiver) = self.receivers.get_mut(&(*entity, *component_kind)) else {
panic!("Should not call this unless we're sure there's a receiver");
};
receiver.or_mask(other_mask);
}
pub fn clear_diff_mask(&mut self, entity: &GlobalEntity, component_kind: &ComponentKind) {
let Some(receiver) = self.receivers.get_mut(&(*entity, *component_kind)) else {
panic!("Should not call this unless we're sure there's a receiver");
};
receiver.clear_mask();
}
#[cfg(feature = "test_utils")]
pub fn receiver_count(&self) -> usize {
self.receivers.len()
}
#[cfg(feature = "test_utils")]
pub fn dirty_candidates_count(&self) -> usize {
self.receivers.values().filter(|r| !r.diff_mask_is_clear()).count()
}
pub fn dirty_receiver_candidates(&self) -> HashMap<GlobalEntity, HashSet<ComponentKind>> {
let drained: Vec<(EntityIndex, Vec<u64>)> = self.dirty_set.drain();
for (idx, words) in &drained {
for (word_idx, &word) in words.iter().enumerate() {
let mut remaining = word;
while remaining != 0 {
let bit = remaining.trailing_zeros() as u16;
let kind_bit = (word_idx as u16) * 64 + bit;
self.dirty_set.push(*idx, kind_bit);
remaining &= remaining - 1;
}
}
}
let mut result: HashMap<GlobalEntity, HashSet<ComponentKind>> =
HashMap::with_capacity(drained.len());
for (idx, words) in drained {
let Some(Some(entity)) = self.index_to_entity.get(idx.0 as usize) else {
continue;
};
let mut set = HashSet::new();
for (word_idx, word) in words.into_iter().enumerate() {
let mut remaining = word;
while remaining != 0 {
let bit = remaining.trailing_zeros() as usize;
let absolute_bit = word_idx * 64 + bit;
if let Some(Some(kind)) = self.kinds_by_bit.get(absolute_bit) {
set.insert(*kind);
}
remaining &= remaining - 1;
}
}
if !set.is_empty() {
result.insert(*entity, set);
}
}
#[cfg(feature = "bench_instrumentation")]
{
use std::sync::atomic::Ordering;
dirty_scan_counters::SCAN_CALLS.fetch_add(1, Ordering::Relaxed);
let visited: u64 = result.values().map(|s| s.len() as u64).sum();
dirty_scan_counters::RECEIVERS_VISITED.fetch_add(visited, Ordering::Relaxed);
dirty_scan_counters::DIRTY_RESULTS.fetch_add(visited, Ordering::Relaxed);
}
result
}
}