Skip to main content

naia_shared/world/update/
mut_channel.rs

1use std::{
2    net::SocketAddr,
3    sync::{
4        atomic::{AtomicU64, Ordering},
5        Arc, OnceLock, RwLock, Weak,
6    },
7};
8
9use parking_lot::{Mutex as PlMutex, RwLock as PlRwLock};
10
11use crate::world::entity_index::EntityIndex;
12use crate::world::update::atomic_diff_mask::AtomicDiffMask;
13use crate::{DiffMask, GlobalWorldManagerType, PropertyMutate};
14
15/// Per-user dirty queue (Phase 9.4 / Stage E + B-strict + 2026-05-05
16/// unlimited-kind-count refactor).
17///
18/// Tracks, per `EntityIndex`, which `ComponentKind`s are currently
19/// dirty. Lock-free hot path; cold-path resize on entity allocation.
20///
21/// ## Variable-width kind bitset (no more 64-kind limit)
22///
23/// Each entity gets `stride` `AtomicU64` words of dirty bits, where
24/// `stride = ceil(kind_count / 64)`. `kind_bit` of value `K` lives in
25/// word `K / 64` at bit position `K % 64`. The flat layout is
26/// `bits[entity_idx * stride + word_idx]` — one contiguous `Vec` for
27/// all entities, each entity occupying `stride` consecutive words.
28/// `stride` is set at construction from the protocol's locked
29/// component-kind count and never changes.
30///
31/// Pre-2026-05-05 the bits were a single `Vec<AtomicU64>` (one word
32/// per entity = ≤64 kinds). The cap was a `debug_assert!` in
33/// `ComponentKinds::add_component`. cyberlith and other large
34/// protocols were going to hit it.
35///
36/// ## Lock + atomic discipline
37///
38/// - `bits` is wrapped in `PlRwLock<Vec<AtomicU64>>`: write-guard for
39///   `ensure_capacity` (cold path), read-guard for hot-path `fetch_or`
40///   / `fetch_and` / `swap`. Resize is the only writer.
41/// - `indices` is a cold-path `PlMutex<Vec<EntityIndex>>` — locked
42///   only on first-bit-set-per-entity push and at drain. Tolerates
43///   duplicate entries (drain dedupes via the bitset's per-word swap).
44///
45/// ## "Was clear" semantics under multi-word
46///
47/// `push` returns `was_clear == true` (and locks `indices` to push)
48/// when the kind_bit's word was zero before our `fetch_or` AND the
49/// other words for this entity are also zero (relaxed loads — race-
50/// tolerant). Concurrent pushes to different words of the same entity
51/// might both report was_clear and double-push the index; the
52/// `indices` Vec accepts duplicates and drain swap-zeroes the bits
53/// once, so the duplicate entry contributes nothing on the second
54/// drain pass. Net contract: at-least-once index entry per
55/// clean→dirty transition, with rare benign duplicates.
56///
57/// Wire-format invariant unchanged — this is CPU-only bookkeeping.
58pub struct DirtyQueue {
59    /// Flat `Vec<AtomicU64>`, length = `entity_count * stride`. Word
60    /// for `(entity_idx, word_idx)` is at index
61    /// `entity_idx * stride + word_idx`. Resized only by
62    /// `ensure_capacity` under the `RwLock` write guard; hot-path
63    /// `fetch_or` / `fetch_and` / `swap` access individual slots
64    /// under the read guard.
65    bits: PlRwLock<Vec<AtomicU64>>,
66    /// Words per entity = `ceil(kind_count / 64).max(1)`. Set at
67    /// construction; never changes (protocol is locked before any
68    /// `DirtyQueue` is created).
69    stride: usize,
70    /// Cold-path-only mutex: locked at first-bit-set-per-entity push
71    /// and at drain. Tolerates duplicate entries — drain dedupes via
72    /// `bits`.
73    indices: PlMutex<Vec<EntityIndex>>,
74}
75
76impl DirtyQueue {
77    /// Construct with capacity for `kind_count` distinct
78    /// `ComponentKind`s (= `kind_count` distinct `kind_bit` values).
79    /// `stride` is derived as `ceil(kind_count / 64).max(1)`. Common
80    /// case: `kind_count ≤ 64` → `stride == 1` (zero-overhead vs the
81    /// old single-`AtomicU64` layout).
82    pub fn new(kind_count: u16) -> Self {
83        let stride = ((kind_count as usize).div_ceil(64)).max(1);
84        Self {
85            bits: PlRwLock::new(Vec::new()),
86            stride,
87            indices: PlMutex::new(Vec::new()),
88        }
89    }
90
91    /// Words per entity in this queue. Public for tests + bench
92    /// instrumentation; production code shouldn't need it.
93    pub fn stride(&self) -> usize {
94        self.stride
95    }
96
97    /// Pre-grow `bits` to cover at least `slot + 1` entities. Cold
98    /// path — called from `UserDiffHandler::allocate_entity_index`
99    /// synchronously before the issued `EntityIndex` is exposed to
100    /// any mutation. Takes the write guard, which excludes hot-path
101    /// readers; safe because allocation runs on the same thread that
102    /// issues mutations.
103    pub fn ensure_capacity(&self, slot: usize) {
104        let needed = (slot + 1) * self.stride;
105        if self.bits.read().len() >= needed {
106            return;
107        }
108        let mut w = self.bits.write();
109        while w.len() < needed {
110            w.push(AtomicU64::new(0));
111        }
112    }
113
114    /// Mark `(entity_idx, kind_bit)` dirty. Lock-free atomic on the
115    /// bits side; cold-path mutex push only on clean→dirty transition
116    /// for this entity. `kind_bit` widened to `u16` (was `u8`) to
117    /// support arbitrary protocol kind counts.
118    #[inline]
119    pub fn push(&self, entity_idx: EntityIndex, kind_bit: u16) {
120        let word_idx = (kind_bit as usize) / 64;
121        let bit_in_word = (kind_bit as u32) % 64;
122        let kind_mask = 1u64 << bit_in_word;
123        let entity_base = (entity_idx.0 as usize) * self.stride;
124        let slot_idx = entity_base + word_idx;
125
126        let prev = {
127            let bits = self.bits.read();
128            if let Some(slot) = bits.get(slot_idx) {
129                slot.fetch_or(kind_mask, Ordering::Relaxed)
130            } else {
131                drop(bits);
132                // Defensive: ensure capacity then retry. Should not
133                // happen in production — `allocate_entity_index`
134                // pre-grows. Cost: one extra read + write lock pair,
135                // only on misconfigured callers.
136                self.ensure_capacity(entity_idx.0 as usize);
137                let bits = self.bits.read();
138                bits[slot_idx].fetch_or(kind_mask, Ordering::Relaxed)
139            }
140        };
141
142        if prev != 0 {
143            return;
144        }
145        // Word was zero before our fetch_or. Check whether the other
146        // words for this entity are also zero. Race-tolerant: if a
147        // concurrent push to another word happens between our load
148        // and theirs, both might report was_clear and both push to
149        // `indices` — drain dedupes via the per-word swap.
150        let was_clear = if self.stride == 1 {
151            true
152        } else {
153            let bits = self.bits.read();
154            (0..self.stride).all(|w| {
155                if w == word_idx {
156                    return true;
157                }
158                bits.get(entity_base + w)
159                    .map(|word| word.load(Ordering::Relaxed) == 0)
160                    .unwrap_or(true)
161            })
162        };
163        if was_clear {
164            self.indices.lock().push(entity_idx);
165        }
166    }
167
168    /// Clear `(entity_idx, kind_bit)`. Atomic `fetch_and` on the bits
169    /// side; never touches the indices mutex (drain dedupes stale
170    /// entries). Tolerates out-of-range slots (returns silently).
171    #[inline]
172    pub fn cancel(&self, entity_idx: EntityIndex, kind_bit: u16) {
173        let word_idx = (kind_bit as usize) / 64;
174        let bit_in_word = (kind_bit as u32) % 64;
175        let kind_mask = 1u64 << bit_in_word;
176        let slot_idx = (entity_idx.0 as usize) * self.stride + word_idx;
177        let bits = self.bits.read();
178        if let Some(slot) = bits.get(slot_idx) {
179            slot.fetch_and(!kind_mask, Ordering::Relaxed);
180        }
181    }
182
183    /// Drain: take ownership of the indices list, then atomically
184    /// swap-zero every word of each indexed entity. Returns owned
185    /// `(EntityIndex, dirty_words)` pairs where `dirty_words` is a
186    /// `Vec<u64>` of length `stride` (one word per kind-word). Entries
187    /// that ended up zero across all words (cancelled or already
188    /// drained) are skipped.
189    pub fn drain(&self) -> Vec<(EntityIndex, Vec<u64>)> {
190        let indices: Vec<EntityIndex> = std::mem::take(&mut *self.indices.lock());
191        let mut out: Vec<(EntityIndex, Vec<u64>)> = Vec::with_capacity(indices.len());
192        let bits = self.bits.read();
193        for idx in indices {
194            let entity_base = (idx.0 as usize) * self.stride;
195            let mut words: Vec<u64> = Vec::with_capacity(self.stride);
196            let mut any = false;
197            for w in 0..self.stride {
198                let v = bits
199                    .get(entity_base + w)
200                    .map(|slot| slot.swap(0, Ordering::Relaxed))
201                    .unwrap_or(0);
202                if v != 0 {
203                    any = true;
204                }
205                words.push(v);
206            }
207            if any {
208                out.push((idx, words));
209            }
210        }
211        out
212    }
213
214    /// Returns `true` if no entity indices are currently queued for draining.
215    pub fn is_empty(&self) -> bool {
216        self.indices.lock().is_empty()
217    }
218}
219
220/// Shared dirty queue owned by a `UserDiffHandler`. MutReceivers hold a
221/// `Weak` into this and call `push` directly — `DirtyQueue` provides
222/// interior mutability via its inner `RwLock`/`Mutex` so there is no
223/// outer `Mutex<DirtyQueue>` wrapper. B-strict made the bits-side
224/// fetch_or lock-free under a read guard.
225pub type DirtySet = DirtyQueue;
226
227/// Identifies a MutReceiver's position inside its owning UserDiffHandler's
228/// dirty set. Installed once per receiver via `MutReceiver::attach_notifier`
229/// (OnceLock — all clones share the notifier). Carries the per-user
230/// `EntityIndex` and the protocol-wide `kind_bit` (= ComponentKind's NetId)
231/// — both resolved once at registration time, so notify is a Vec OR, not a
232/// hash.
233/// Lightweight handle installed in a [`MutReceiver`] to push dirty notifications into a [`DirtySet`] on mutation.
234pub struct DirtyNotifier {
235    entity_idx: EntityIndex,
236    kind_bit: u16,
237    set: Weak<DirtySet>,
238}
239
240impl DirtyNotifier {
241    /// Creates a `DirtyNotifier` that marks `(entity_idx, kind_bit)` dirty in `set` on mutation.
242    pub fn new(
243        entity_idx: EntityIndex,
244        kind_bit: u16,
245        set: Weak<DirtySet>,
246    ) -> Self {
247        Self { entity_idx, kind_bit, set }
248    }
249
250    fn notify_dirty(&self) {
251        if let Some(set) = self.set.upgrade() {
252            set.push(self.entity_idx, self.kind_bit);
253        }
254    }
255
256    fn notify_clean(&self) {
257        if let Some(set) = self.set.upgrade() {
258            set.cancel(self.entity_idx, self.kind_bit);
259        }
260    }
261}
262
263/// Internal trait implemented by the concrete mutation channel; produces receivers and propagates property-index notifications.
264pub trait MutChannelType: Send + Sync {
265    /// Creates and returns a new [`MutReceiver`] bound to `address`, or `None` if the address is excluded.
266    fn new_receiver(&mut self, address: &Option<SocketAddr>) -> Option<MutReceiver>;
267    /// Notifies all receivers that property `diff` has changed.
268    fn send(&self, diff: u8);
269}
270
271/// Shared mutation channel that connects a component's property mutator to all interested receivers.
272#[derive(Clone)]
273pub struct MutChannel {
274    data: Arc<RwLock<dyn MutChannelType>>,
275}
276
277impl MutChannel {
278    /// Creates a new `(MutSender, MutReceiverBuilder)` pair backed by a channel allocated through `global_world_manager`.
279    pub fn new_channel(
280        global_world_manager: &dyn GlobalWorldManagerType,
281        diff_mask_length: u8,
282    ) -> (MutSender, MutReceiverBuilder) {
283        let channel = Self {
284            data: global_world_manager.new_mut_channel(diff_mask_length),
285        };
286
287        let sender = channel.new_sender();
288
289        let builder = MutReceiverBuilder::new(&channel);
290
291        (sender, builder)
292    }
293
294    /// Returns a new [`MutSender`] that forwards property-index notifications into this channel.
295    pub fn new_sender(&self) -> MutSender {
296        MutSender::new(self)
297    }
298
299    /// Creates a new [`MutReceiver`] for `address`, or `None` if the channel excludes this address.
300    pub fn new_receiver(&self, address: &Option<SocketAddr>) -> Option<MutReceiver> {
301        if let Ok(mut data) = self.data.as_ref().write() {
302            return data.new_receiver(address);
303        }
304        None
305    }
306
307    /// Propagates a property-index notification to all receivers; returns `false` if the channel lock is poisoned.
308    pub fn send(&self, property_index: u8) -> bool {
309        if let Ok(data) = self.data.as_ref().read() {
310            data.send(property_index);
311            return true;
312        }
313        false
314    }
315}
316
317// MutReceiver — atomic, lock-free hot path.
318//
319// Phase 8.1 Stage C (2026-04-25): replaced `Arc<RwLock<DiffMask>>` with
320// `Arc<AtomicDiffMask>`. `mutate(prop_idx)` is now a single atomic
321// `fetch_or` instead of a `RwLock::write` + `Vec<u8>::set`-bit dance. The
322// `was_clear` signal that gates `notify_dirty` is the same `prev == 0`
323// check the atomic returns — semantics are byte-for-byte identical to
324// the prior implementation, but the per-mutation cost drops from a
325// lock-acquire round trip to one cache-line atomic.
326//
327// `Arc` is retained only because each user clones the same receiver via
328// `MutChannelData::new_receiver`, so the inner mask must be shared. The
329// notifier is `Arc<OnceLock<...>>` for the same reason.
330/// Per-user receiver that accumulates dirty bits for a single component and notifies the user's [`DirtySet`] on first mutation.
331#[derive(Clone)]
332pub struct MutReceiver {
333    mask: Arc<AtomicDiffMask>,
334    notifier: Arc<OnceLock<DirtyNotifier>>,
335}
336
337impl MutReceiver {
338    /// Creates a `MutReceiver` with an atomic diff mask of `diff_mask_length` bytes.
339    pub fn new(diff_mask_length: u8) -> Self {
340        Self {
341            mask: Arc::new(AtomicDiffMask::new(diff_mask_length)),
342            notifier: Arc::new(OnceLock::new()),
343        }
344    }
345
346    /// Installed once per receiver by UserDiffHandler::register_component.
347    /// Cheap no-op on re-attachment (OnceLock::set returns Err, ignored).
348    pub fn attach_notifier(&self, notifier: DirtyNotifier) {
349        let _ = self.notifier.set(notifier);
350    }
351
352    /// Snapshot the receiver's current mask into an owned `DiffMask`.
353    /// Used by `world_writer` when copying the mask into `sent_updates`
354    /// before clearing the receiver. Replaces the prior
355    /// `RwLockReadGuard<'_, DiffMask>` API which forced callers to clone
356    /// while holding a read lock.
357    pub fn mask_snapshot(&self) -> DiffMask {
358        self.mask.snapshot()
359    }
360
361    /// Read one byte of the receiver's mask. Cheaper than `mask_snapshot()`
362    /// when callers only need a single byte (currently unused but kept as
363    /// the obvious primitive on top of the atomic representation).
364    pub fn mask_byte(&self, index: usize) -> u8 {
365        self.mask.byte(index)
366    }
367
368    /// Returns `true` if no property bits are currently set in this receiver's diff mask.
369    pub fn diff_mask_is_clear(&self) -> bool {
370        self.mask.is_clear()
371    }
372
373    /// Marks `property_index` dirty in the diff mask, notifying the dirty queue if the mask transitions from clean to dirty.
374    pub fn mutate(&self, property_index: u8) {
375        let was_clear = self.mask.set_bit(property_index);
376        if was_clear {
377            if let Some(n) = self.notifier.get() {
378                n.notify_dirty();
379            }
380        }
381    }
382
383    /// ORs `other_mask` into the diff mask, notifying the dirty queue if the mask transitions from clean to dirty.
384    pub fn or_mask(&self, other_mask: &DiffMask) {
385        let was_clear_now_dirty = self.mask.or_with(other_mask);
386        if was_clear_now_dirty {
387            if let Some(n) = self.notifier.get() {
388                n.notify_dirty();
389            }
390        }
391    }
392
393    /// Clears all bits in the diff mask and notifies the dirty queue if the mask was dirty.
394    pub fn clear_mask(&self) {
395        let was_dirty = self.mask.clear();
396        if was_dirty {
397            if let Some(n) = self.notifier.get() {
398                n.notify_clean();
399            }
400        }
401    }
402}
403
404/// Write-only handle that forwards property-mutation notifications into a [`MutChannel`].
405#[derive(Clone)]
406pub struct MutSender {
407    channel: MutChannel,
408}
409
410impl MutSender {
411    /// Creates a `MutSender` backed by `channel`.
412    pub fn new(channel: &MutChannel) -> Self {
413        Self {
414            channel: channel.clone(),
415        }
416    }
417}
418
419impl PropertyMutate for MutSender {
420    fn mutate(&mut self, property_index: u8) -> bool {
421        
422        self.channel.send(property_index)
423    }
424}
425
426/// Factory that produces per-user [`MutReceiver`]s from a shared [`MutChannel`].
427pub struct MutReceiverBuilder {
428    channel: MutChannel,
429}
430
431impl MutReceiverBuilder {
432    /// Creates a `MutReceiverBuilder` backed by `channel`.
433    pub fn new(channel: &MutChannel) -> Self {
434        Self {
435            channel: channel.clone(),
436        }
437    }
438
439    /// Builds a new [`MutReceiver`] for `address`, or `None` if the channel excludes this address.
440    pub fn build(&self, address: &Option<SocketAddr>) -> Option<MutReceiver> {
441        self.channel.new_receiver(address)
442    }
443}
444
445#[cfg(test)]
446mod dirty_queue_unlimited_kinds_tests {
447    //! Pins the post-T1.3 invariant: the per-user `DirtyQueue` is no
448    //! longer capped at 64 component kinds. The flat-strided
449    //! `Vec<AtomicU64>` storage scales with `kind_count`, and
450    //! `kind_bit` values ≥ 64 round-trip through `push` → `drain`.
451    use super::*;
452    use crate::EntityIndex;
453    use std::sync::Arc;
454
455    #[test]
456    fn stride_grows_with_kind_count() {
457        assert_eq!(DirtyQueue::new(1).stride(), 1);
458        assert_eq!(DirtyQueue::new(64).stride(), 1);
459        assert_eq!(DirtyQueue::new(65).stride(), 2);
460        assert_eq!(DirtyQueue::new(128).stride(), 2);
461        assert_eq!(DirtyQueue::new(129).stride(), 3);
462        assert_eq!(DirtyQueue::new(1024).stride(), 16);
463    }
464
465    #[test]
466    fn kind_bit_above_64_round_trips() {
467        let q = Arc::new(DirtyQueue::new(200));
468        q.ensure_capacity(0);
469        // Pre-T1.3 these kind_bits were unrepresentable (the assertion
470        // in ComponentKinds::add_component capped registration at 64).
471        for &kb in &[0u16, 63, 64, 65, 127, 128, 199] {
472            q.push(EntityIndex(0), kb);
473        }
474        let drained = q.drain();
475        assert_eq!(drained.len(), 1);
476        let (idx, words) = &drained[0];
477        assert_eq!(*idx, EntityIndex(0));
478        assert_eq!(words.len(), q.stride());
479        // Reconstruct the absolute bit positions.
480        let mut bits: Vec<u16> = Vec::new();
481        for (w, &word) in words.iter().enumerate() {
482            let mut remaining = word;
483            while remaining != 0 {
484                let b = remaining.trailing_zeros() as u16;
485                bits.push((w as u16) * 64 + b);
486                remaining &= remaining - 1;
487            }
488        }
489        bits.sort();
490        assert_eq!(bits, vec![0, 63, 64, 65, 127, 128, 199]);
491    }
492
493    #[test]
494    fn cancel_clears_high_kind_bit() {
495        let q = DirtyQueue::new(200);
496        q.ensure_capacity(0);
497        q.push(EntityIndex(0), 130);
498        q.cancel(EntityIndex(0), 130);
499        let drained = q.drain();
500        // Cancel zeroes the bit; drain skips entries with all-zero words.
501        assert!(drained.is_empty());
502    }
503
504    #[test]
505    fn multi_word_was_clear_fires_index_push_once() {
506        let q = DirtyQueue::new(200);
507        q.ensure_capacity(0);
508        // Two pushes to different words for the same entity. Race-tolerant
509        // was_clear may push the index twice, but drain dedupes via
510        // swap-zero — only one drained entry should appear.
511        q.push(EntityIndex(0), 5);
512        q.push(EntityIndex(0), 130);
513        let drained = q.drain();
514        assert_eq!(drained.len(), 1, "expected dedup via drain swap-zero");
515        let (_, words) = &drained[0];
516        assert_eq!(words[0], 1u64 << 5);
517        assert_eq!(words[2], 1u64 << (130 - 128));
518    }
519}