use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock, RwLock, Weak,
},
};
use parking_lot::{Mutex as PlMutex, RwLock as PlRwLock};
use crate::world::entity_index::EntityIndex;
use crate::world::update::atomic_diff_mask::AtomicDiffMask;
use crate::{DiffMask, GlobalWorldManagerType, PropertyMutate};
pub struct DirtyQueue {
bits: PlRwLock<Vec<AtomicU64>>,
stride: usize,
indices: PlMutex<Vec<EntityIndex>>,
}
impl DirtyQueue {
pub fn new(kind_count: u16) -> Self {
let stride = ((kind_count as usize).div_ceil(64)).max(1);
Self {
bits: PlRwLock::new(Vec::new()),
stride,
indices: PlMutex::new(Vec::new()),
}
}
pub fn stride(&self) -> usize {
self.stride
}
pub fn ensure_capacity(&self, slot: usize) {
let needed = (slot + 1) * self.stride;
if self.bits.read().len() >= needed {
return;
}
let mut w = self.bits.write();
while w.len() < needed {
w.push(AtomicU64::new(0));
}
}
#[inline]
pub fn push(&self, entity_idx: EntityIndex, kind_bit: u16) {
let word_idx = (kind_bit as usize) / 64;
let bit_in_word = (kind_bit as u32) % 64;
let kind_mask = 1u64 << bit_in_word;
let entity_base = (entity_idx.0 as usize) * self.stride;
let slot_idx = entity_base + word_idx;
let prev = {
let bits = self.bits.read();
if let Some(slot) = bits.get(slot_idx) {
slot.fetch_or(kind_mask, Ordering::Relaxed)
} else {
drop(bits);
self.ensure_capacity(entity_idx.0 as usize);
let bits = self.bits.read();
bits[slot_idx].fetch_or(kind_mask, Ordering::Relaxed)
}
};
if prev != 0 {
return;
}
let was_clear = if self.stride == 1 {
true
} else {
let bits = self.bits.read();
(0..self.stride).all(|w| {
if w == word_idx {
return true;
}
bits.get(entity_base + w)
.map(|word| word.load(Ordering::Relaxed) == 0)
.unwrap_or(true)
})
};
if was_clear {
self.indices.lock().push(entity_idx);
}
}
#[inline]
pub fn cancel(&self, entity_idx: EntityIndex, kind_bit: u16) {
let word_idx = (kind_bit as usize) / 64;
let bit_in_word = (kind_bit as u32) % 64;
let kind_mask = 1u64 << bit_in_word;
let slot_idx = (entity_idx.0 as usize) * self.stride + word_idx;
let bits = self.bits.read();
if let Some(slot) = bits.get(slot_idx) {
slot.fetch_and(!kind_mask, Ordering::Relaxed);
}
}
pub fn drain(&self) -> Vec<(EntityIndex, Vec<u64>)> {
let indices: Vec<EntityIndex> = std::mem::take(&mut *self.indices.lock());
let mut out: Vec<(EntityIndex, Vec<u64>)> = Vec::with_capacity(indices.len());
let bits = self.bits.read();
for idx in indices {
let entity_base = (idx.0 as usize) * self.stride;
let mut words: Vec<u64> = Vec::with_capacity(self.stride);
let mut any = false;
for w in 0..self.stride {
let v = bits
.get(entity_base + w)
.map(|slot| slot.swap(0, Ordering::Relaxed))
.unwrap_or(0);
if v != 0 {
any = true;
}
words.push(v);
}
if any {
out.push((idx, words));
}
}
out
}
pub fn is_empty(&self) -> bool {
self.indices.lock().is_empty()
}
}
pub type DirtySet = DirtyQueue;
pub struct DirtyNotifier {
entity_idx: EntityIndex,
kind_bit: u16,
set: Weak<DirtySet>,
}
impl DirtyNotifier {
pub fn new(
entity_idx: EntityIndex,
kind_bit: u16,
set: Weak<DirtySet>,
) -> Self {
Self { entity_idx, kind_bit, set }
}
fn notify_dirty(&self) {
if let Some(set) = self.set.upgrade() {
set.push(self.entity_idx, self.kind_bit);
}
}
fn notify_clean(&self) {
if let Some(set) = self.set.upgrade() {
set.cancel(self.entity_idx, self.kind_bit);
}
}
}
pub trait MutChannelType: Send + Sync {
fn new_receiver(&mut self, address: &Option<SocketAddr>) -> Option<MutReceiver>;
fn send(&self, diff: u8);
}
#[derive(Clone)]
pub struct MutChannel {
data: Arc<RwLock<dyn MutChannelType>>,
}
impl MutChannel {
pub fn new_channel(
global_world_manager: &dyn GlobalWorldManagerType,
diff_mask_length: u8,
) -> (MutSender, MutReceiverBuilder) {
let channel = Self {
data: global_world_manager.new_mut_channel(diff_mask_length),
};
let sender = channel.new_sender();
let builder = MutReceiverBuilder::new(&channel);
(sender, builder)
}
pub fn new_sender(&self) -> MutSender {
MutSender::new(self)
}
pub fn new_receiver(&self, address: &Option<SocketAddr>) -> Option<MutReceiver> {
if let Ok(mut data) = self.data.as_ref().write() {
return data.new_receiver(address);
}
None
}
pub fn send(&self, property_index: u8) -> bool {
if let Ok(data) = self.data.as_ref().read() {
data.send(property_index);
return true;
}
false
}
}
#[derive(Clone)]
pub struct MutReceiver {
mask: Arc<AtomicDiffMask>,
notifier: Arc<OnceLock<DirtyNotifier>>,
}
impl MutReceiver {
pub fn new(diff_mask_length: u8) -> Self {
Self {
mask: Arc::new(AtomicDiffMask::new(diff_mask_length)),
notifier: Arc::new(OnceLock::new()),
}
}
pub fn attach_notifier(&self, notifier: DirtyNotifier) {
let _ = self.notifier.set(notifier);
}
pub fn mask_snapshot(&self) -> DiffMask {
self.mask.snapshot()
}
pub fn mask_byte(&self, index: usize) -> u8 {
self.mask.byte(index)
}
pub fn diff_mask_is_clear(&self) -> bool {
self.mask.is_clear()
}
pub fn mutate(&self, property_index: u8) {
let was_clear = self.mask.set_bit(property_index);
if was_clear {
if let Some(n) = self.notifier.get() {
n.notify_dirty();
}
}
}
pub fn or_mask(&self, other_mask: &DiffMask) {
let was_clear_now_dirty = self.mask.or_with(other_mask);
if was_clear_now_dirty {
if let Some(n) = self.notifier.get() {
n.notify_dirty();
}
}
}
pub fn clear_mask(&self) {
let was_dirty = self.mask.clear();
if was_dirty {
if let Some(n) = self.notifier.get() {
n.notify_clean();
}
}
}
}
#[derive(Clone)]
pub struct MutSender {
channel: MutChannel,
}
impl MutSender {
pub fn new(channel: &MutChannel) -> Self {
Self {
channel: channel.clone(),
}
}
}
impl PropertyMutate for MutSender {
fn mutate(&mut self, property_index: u8) -> bool {
self.channel.send(property_index)
}
}
pub struct MutReceiverBuilder {
channel: MutChannel,
}
impl MutReceiverBuilder {
pub fn new(channel: &MutChannel) -> Self {
Self {
channel: channel.clone(),
}
}
pub fn build(&self, address: &Option<SocketAddr>) -> Option<MutReceiver> {
self.channel.new_receiver(address)
}
}
#[cfg(test)]
mod dirty_queue_unlimited_kinds_tests {
use super::*;
use crate::EntityIndex;
use std::sync::Arc;
#[test]
fn stride_grows_with_kind_count() {
assert_eq!(DirtyQueue::new(1).stride(), 1);
assert_eq!(DirtyQueue::new(64).stride(), 1);
assert_eq!(DirtyQueue::new(65).stride(), 2);
assert_eq!(DirtyQueue::new(128).stride(), 2);
assert_eq!(DirtyQueue::new(129).stride(), 3);
assert_eq!(DirtyQueue::new(1024).stride(), 16);
}
#[test]
fn kind_bit_above_64_round_trips() {
let q = Arc::new(DirtyQueue::new(200));
q.ensure_capacity(0);
for &kb in &[0u16, 63, 64, 65, 127, 128, 199] {
q.push(EntityIndex(0), kb);
}
let drained = q.drain();
assert_eq!(drained.len(), 1);
let (idx, words) = &drained[0];
assert_eq!(*idx, EntityIndex(0));
assert_eq!(words.len(), q.stride());
let mut bits: Vec<u16> = Vec::new();
for (w, &word) in words.iter().enumerate() {
let mut remaining = word;
while remaining != 0 {
let b = remaining.trailing_zeros() as u16;
bits.push((w as u16) * 64 + b);
remaining &= remaining - 1;
}
}
bits.sort();
assert_eq!(bits, vec![0, 63, 64, 65, 127, 128, 199]);
}
#[test]
fn cancel_clears_high_kind_bit() {
let q = DirtyQueue::new(200);
q.ensure_capacity(0);
q.push(EntityIndex(0), 130);
q.cancel(EntityIndex(0), 130);
let drained = q.drain();
assert!(drained.is_empty());
}
#[test]
fn multi_word_was_clear_fires_index_push_once() {
let q = DirtyQueue::new(200);
q.ensure_capacity(0);
q.push(EntityIndex(0), 5);
q.push(EntityIndex(0), 130);
let drained = q.drain();
assert_eq!(drained.len(), 1, "expected dedup via drain swap-zero");
let (_, words) = &drained[0];
assert_eq!(words[0], 1u64 << 5);
assert_eq!(words[2], 1u64 << (130 - 128));
}
}