naia_shared/world/update/mut_channel.rs
1use std::{
2 net::SocketAddr,
3 sync::{
4 atomic::{AtomicU64, Ordering},
5 Arc, OnceLock, RwLock, Weak,
6 },
7};
8
9use parking_lot::{Mutex as PlMutex, RwLock as PlRwLock};
10
11use crate::world::entity_index::EntityIndex;
12use crate::world::update::atomic_diff_mask::AtomicDiffMask;
13use crate::{DiffMask, GlobalWorldManagerType, PropertyMutate};
14
15/// Per-user dirty queue (Phase 9.4 / Stage E + B-strict + 2026-05-05
16/// unlimited-kind-count refactor).
17///
18/// Tracks, per `EntityIndex`, which `ComponentKind`s are currently
19/// dirty. Lock-free hot path; cold-path resize on entity allocation.
20///
21/// ## Variable-width kind bitset (no more 64-kind limit)
22///
23/// Each entity gets `stride` `AtomicU64` words of dirty bits, where
24/// `stride = ceil(kind_count / 64)`. `kind_bit` of value `K` lives in
25/// word `K / 64` at bit position `K % 64`. The flat layout is
26/// `bits[entity_idx * stride + word_idx]` — one contiguous `Vec` for
27/// all entities, each entity occupying `stride` consecutive words.
28/// `stride` is set at construction from the protocol's locked
29/// component-kind count and never changes.
30///
31/// Pre-2026-05-05 the bits were a single `Vec<AtomicU64>` (one word
32/// per entity = ≤64 kinds). The cap was a `debug_assert!` in
33/// `ComponentKinds::add_component`. cyberlith and other large
34/// protocols were going to hit it.
35///
36/// ## Lock + atomic discipline
37///
38/// - `bits` is wrapped in `PlRwLock<Vec<AtomicU64>>`: write-guard for
39/// `ensure_capacity` (cold path), read-guard for hot-path `fetch_or`
40/// / `fetch_and` / `swap`. Resize is the only writer.
41/// - `indices` is a cold-path `PlMutex<Vec<EntityIndex>>` — locked
42/// only on first-bit-set-per-entity push and at drain. Tolerates
43/// duplicate entries (drain dedupes via the bitset's per-word swap).
44///
45/// ## "Was clear" semantics under multi-word
46///
47/// `push` returns `was_clear == true` (and locks `indices` to push)
48/// when the kind_bit's word was zero before our `fetch_or` AND the
49/// other words for this entity are also zero (relaxed loads — race-
50/// tolerant). Concurrent pushes to different words of the same entity
51/// might both report was_clear and double-push the index; the
52/// `indices` Vec accepts duplicates and drain swap-zeroes the bits
53/// once, so the duplicate entry contributes nothing on the second
54/// drain pass. Net contract: at-least-once index entry per
55/// clean→dirty transition, with rare benign duplicates.
56///
57/// Wire-format invariant unchanged — this is CPU-only bookkeeping.
58pub struct DirtyQueue {
59 /// Flat `Vec<AtomicU64>`, length = `entity_count * stride`. Word
60 /// for `(entity_idx, word_idx)` is at index
61 /// `entity_idx * stride + word_idx`. Resized only by
62 /// `ensure_capacity` under the `RwLock` write guard; hot-path
63 /// `fetch_or` / `fetch_and` / `swap` access individual slots
64 /// under the read guard.
65 bits: PlRwLock<Vec<AtomicU64>>,
66 /// Words per entity = `ceil(kind_count / 64).max(1)`. Set at
67 /// construction; never changes (protocol is locked before any
68 /// `DirtyQueue` is created).
69 stride: usize,
70 /// Cold-path-only mutex: locked at first-bit-set-per-entity push
71 /// and at drain. Tolerates duplicate entries — drain dedupes via
72 /// `bits`.
73 indices: PlMutex<Vec<EntityIndex>>,
74}
75
76impl DirtyQueue {
77 /// Construct with capacity for `kind_count` distinct
78 /// `ComponentKind`s (= `kind_count` distinct `kind_bit` values).
79 /// `stride` is derived as `ceil(kind_count / 64).max(1)`. Common
80 /// case: `kind_count ≤ 64` → `stride == 1` (zero-overhead vs the
81 /// old single-`AtomicU64` layout).
82 pub fn new(kind_count: u16) -> Self {
83 let stride = ((kind_count as usize).div_ceil(64)).max(1);
84 Self {
85 bits: PlRwLock::new(Vec::new()),
86 stride,
87 indices: PlMutex::new(Vec::new()),
88 }
89 }
90
91 /// Words per entity in this queue. Public for tests + bench
92 /// instrumentation; production code shouldn't need it.
93 pub fn stride(&self) -> usize {
94 self.stride
95 }
96
97 /// Pre-grow `bits` to cover at least `slot + 1` entities. Cold
98 /// path — called from `UserDiffHandler::allocate_entity_index`
99 /// synchronously before the issued `EntityIndex` is exposed to
100 /// any mutation. Takes the write guard, which excludes hot-path
101 /// readers; safe because allocation runs on the same thread that
102 /// issues mutations.
103 pub fn ensure_capacity(&self, slot: usize) {
104 let needed = (slot + 1) * self.stride;
105 if self.bits.read().len() >= needed {
106 return;
107 }
108 let mut w = self.bits.write();
109 while w.len() < needed {
110 w.push(AtomicU64::new(0));
111 }
112 }
113
114 /// Mark `(entity_idx, kind_bit)` dirty. Lock-free atomic on the
115 /// bits side; cold-path mutex push only on clean→dirty transition
116 /// for this entity. `kind_bit` widened to `u16` (was `u8`) to
117 /// support arbitrary protocol kind counts.
118 #[inline]
119 pub fn push(&self, entity_idx: EntityIndex, kind_bit: u16) {
120 let word_idx = (kind_bit as usize) / 64;
121 let bit_in_word = (kind_bit as u32) % 64;
122 let kind_mask = 1u64 << bit_in_word;
123 let entity_base = (entity_idx.0 as usize) * self.stride;
124 let slot_idx = entity_base + word_idx;
125
126 let prev = {
127 let bits = self.bits.read();
128 if let Some(slot) = bits.get(slot_idx) {
129 slot.fetch_or(kind_mask, Ordering::Relaxed)
130 } else {
131 drop(bits);
132 // Defensive: ensure capacity then retry. Should not
133 // happen in production — `allocate_entity_index`
134 // pre-grows. Cost: one extra read + write lock pair,
135 // only on misconfigured callers.
136 self.ensure_capacity(entity_idx.0 as usize);
137 let bits = self.bits.read();
138 bits[slot_idx].fetch_or(kind_mask, Ordering::Relaxed)
139 }
140 };
141
142 if prev != 0 {
143 return;
144 }
145 // Word was zero before our fetch_or. Check whether the other
146 // words for this entity are also zero. Race-tolerant: if a
147 // concurrent push to another word happens between our load
148 // and theirs, both might report was_clear and both push to
149 // `indices` — drain dedupes via the per-word swap.
150 let was_clear = if self.stride == 1 {
151 true
152 } else {
153 let bits = self.bits.read();
154 (0..self.stride).all(|w| {
155 if w == word_idx {
156 return true;
157 }
158 bits.get(entity_base + w)
159 .map(|word| word.load(Ordering::Relaxed) == 0)
160 .unwrap_or(true)
161 })
162 };
163 if was_clear {
164 self.indices.lock().push(entity_idx);
165 }
166 }
167
168 /// Clear `(entity_idx, kind_bit)`. Atomic `fetch_and` on the bits
169 /// side; never touches the indices mutex (drain dedupes stale
170 /// entries). Tolerates out-of-range slots (returns silently).
171 #[inline]
172 pub fn cancel(&self, entity_idx: EntityIndex, kind_bit: u16) {
173 let word_idx = (kind_bit as usize) / 64;
174 let bit_in_word = (kind_bit as u32) % 64;
175 let kind_mask = 1u64 << bit_in_word;
176 let slot_idx = (entity_idx.0 as usize) * self.stride + word_idx;
177 let bits = self.bits.read();
178 if let Some(slot) = bits.get(slot_idx) {
179 slot.fetch_and(!kind_mask, Ordering::Relaxed);
180 }
181 }
182
183 /// Drain: take ownership of the indices list, then atomically
184 /// swap-zero every word of each indexed entity. Returns owned
185 /// `(EntityIndex, dirty_words)` pairs where `dirty_words` is a
186 /// `Vec<u64>` of length `stride` (one word per kind-word). Entries
187 /// that ended up zero across all words (cancelled or already
188 /// drained) are skipped.
189 pub fn drain(&self) -> Vec<(EntityIndex, Vec<u64>)> {
190 let indices: Vec<EntityIndex> = std::mem::take(&mut *self.indices.lock());
191 let mut out: Vec<(EntityIndex, Vec<u64>)> = Vec::with_capacity(indices.len());
192 let bits = self.bits.read();
193 for idx in indices {
194 let entity_base = (idx.0 as usize) * self.stride;
195 let mut words: Vec<u64> = Vec::with_capacity(self.stride);
196 let mut any = false;
197 for w in 0..self.stride {
198 let v = bits
199 .get(entity_base + w)
200 .map(|slot| slot.swap(0, Ordering::Relaxed))
201 .unwrap_or(0);
202 if v != 0 {
203 any = true;
204 }
205 words.push(v);
206 }
207 if any {
208 out.push((idx, words));
209 }
210 }
211 out
212 }
213
214 /// Returns `true` if no entity indices are currently queued for draining.
215 pub fn is_empty(&self) -> bool {
216 self.indices.lock().is_empty()
217 }
218}
219
220/// Shared dirty queue owned by a `UserDiffHandler`. MutReceivers hold a
221/// `Weak` into this and call `push` directly — `DirtyQueue` provides
222/// interior mutability via its inner `RwLock`/`Mutex` so there is no
223/// outer `Mutex<DirtyQueue>` wrapper. B-strict made the bits-side
224/// fetch_or lock-free under a read guard.
225pub type DirtySet = DirtyQueue;
226
227/// Identifies a MutReceiver's position inside its owning UserDiffHandler's
228/// dirty set. Installed once per receiver via `MutReceiver::attach_notifier`
229/// (OnceLock — all clones share the notifier). Carries the per-user
230/// `EntityIndex` and the protocol-wide `kind_bit` (= ComponentKind's NetId)
231/// — both resolved once at registration time, so notify is a Vec OR, not a
232/// hash.
233/// Lightweight handle installed in a [`MutReceiver`] to push dirty notifications into a [`DirtySet`] on mutation.
234pub struct DirtyNotifier {
235 entity_idx: EntityIndex,
236 kind_bit: u16,
237 set: Weak<DirtySet>,
238}
239
240impl DirtyNotifier {
241 /// Creates a `DirtyNotifier` that marks `(entity_idx, kind_bit)` dirty in `set` on mutation.
242 pub fn new(
243 entity_idx: EntityIndex,
244 kind_bit: u16,
245 set: Weak<DirtySet>,
246 ) -> Self {
247 Self { entity_idx, kind_bit, set }
248 }
249
250 fn notify_dirty(&self) {
251 if let Some(set) = self.set.upgrade() {
252 set.push(self.entity_idx, self.kind_bit);
253 }
254 }
255
256 fn notify_clean(&self) {
257 if let Some(set) = self.set.upgrade() {
258 set.cancel(self.entity_idx, self.kind_bit);
259 }
260 }
261}
262
263/// Internal trait implemented by the concrete mutation channel; produces receivers and propagates property-index notifications.
264pub trait MutChannelType: Send + Sync {
265 /// Creates and returns a new [`MutReceiver`] bound to `address`, or `None` if the address is excluded.
266 fn new_receiver(&mut self, address: &Option<SocketAddr>) -> Option<MutReceiver>;
267 /// Notifies all receivers that property `diff` has changed.
268 fn send(&self, diff: u8);
269}
270
271/// Shared mutation channel that connects a component's property mutator to all interested receivers.
272#[derive(Clone)]
273pub struct MutChannel {
274 data: Arc<RwLock<dyn MutChannelType>>,
275}
276
277impl MutChannel {
278 /// Creates a new `(MutSender, MutReceiverBuilder)` pair backed by a channel allocated through `global_world_manager`.
279 pub fn new_channel(
280 global_world_manager: &dyn GlobalWorldManagerType,
281 diff_mask_length: u8,
282 ) -> (MutSender, MutReceiverBuilder) {
283 let channel = Self {
284 data: global_world_manager.new_mut_channel(diff_mask_length),
285 };
286
287 let sender = channel.new_sender();
288
289 let builder = MutReceiverBuilder::new(&channel);
290
291 (sender, builder)
292 }
293
294 /// Returns a new [`MutSender`] that forwards property-index notifications into this channel.
295 pub fn new_sender(&self) -> MutSender {
296 MutSender::new(self)
297 }
298
299 /// Creates a new [`MutReceiver`] for `address`, or `None` if the channel excludes this address.
300 pub fn new_receiver(&self, address: &Option<SocketAddr>) -> Option<MutReceiver> {
301 if let Ok(mut data) = self.data.as_ref().write() {
302 return data.new_receiver(address);
303 }
304 None
305 }
306
307 /// Propagates a property-index notification to all receivers; returns `false` if the channel lock is poisoned.
308 pub fn send(&self, property_index: u8) -> bool {
309 if let Ok(data) = self.data.as_ref().read() {
310 data.send(property_index);
311 return true;
312 }
313 false
314 }
315}
316
317// MutReceiver — atomic, lock-free hot path.
318//
319// Phase 8.1 Stage C (2026-04-25): replaced `Arc<RwLock<DiffMask>>` with
320// `Arc<AtomicDiffMask>`. `mutate(prop_idx)` is now a single atomic
321// `fetch_or` instead of a `RwLock::write` + `Vec<u8>::set`-bit dance. The
322// `was_clear` signal that gates `notify_dirty` is the same `prev == 0`
323// check the atomic returns — semantics are byte-for-byte identical to
324// the prior implementation, but the per-mutation cost drops from a
325// lock-acquire round trip to one cache-line atomic.
326//
327// `Arc` is retained only because each user clones the same receiver via
328// `MutChannelData::new_receiver`, so the inner mask must be shared. The
329// notifier is `Arc<OnceLock<...>>` for the same reason.
330/// Per-user receiver that accumulates dirty bits for a single component and notifies the user's [`DirtySet`] on first mutation.
331#[derive(Clone)]
332pub struct MutReceiver {
333 mask: Arc<AtomicDiffMask>,
334 notifier: Arc<OnceLock<DirtyNotifier>>,
335}
336
337impl MutReceiver {
338 /// Creates a `MutReceiver` with an atomic diff mask of `diff_mask_length` bytes.
339 pub fn new(diff_mask_length: u8) -> Self {
340 Self {
341 mask: Arc::new(AtomicDiffMask::new(diff_mask_length)),
342 notifier: Arc::new(OnceLock::new()),
343 }
344 }
345
346 /// Installed once per receiver by UserDiffHandler::register_component.
347 /// Cheap no-op on re-attachment (OnceLock::set returns Err, ignored).
348 pub fn attach_notifier(&self, notifier: DirtyNotifier) {
349 let _ = self.notifier.set(notifier);
350 }
351
352 /// Snapshot the receiver's current mask into an owned `DiffMask`.
353 /// Used by `world_writer` when copying the mask into `sent_updates`
354 /// before clearing the receiver. Replaces the prior
355 /// `RwLockReadGuard<'_, DiffMask>` API which forced callers to clone
356 /// while holding a read lock.
357 pub fn mask_snapshot(&self) -> DiffMask {
358 self.mask.snapshot()
359 }
360
361 /// Read one byte of the receiver's mask. Cheaper than `mask_snapshot()`
362 /// when callers only need a single byte (currently unused but kept as
363 /// the obvious primitive on top of the atomic representation).
364 pub fn mask_byte(&self, index: usize) -> u8 {
365 self.mask.byte(index)
366 }
367
368 /// Returns `true` if no property bits are currently set in this receiver's diff mask.
369 pub fn diff_mask_is_clear(&self) -> bool {
370 self.mask.is_clear()
371 }
372
373 /// Marks `property_index` dirty in the diff mask, notifying the dirty queue if the mask transitions from clean to dirty.
374 pub fn mutate(&self, property_index: u8) {
375 let was_clear = self.mask.set_bit(property_index);
376 if was_clear {
377 if let Some(n) = self.notifier.get() {
378 n.notify_dirty();
379 }
380 }
381 }
382
383 /// ORs `other_mask` into the diff mask, notifying the dirty queue if the mask transitions from clean to dirty.
384 pub fn or_mask(&self, other_mask: &DiffMask) {
385 let was_clear_now_dirty = self.mask.or_with(other_mask);
386 if was_clear_now_dirty {
387 if let Some(n) = self.notifier.get() {
388 n.notify_dirty();
389 }
390 }
391 }
392
393 /// Clears all bits in the diff mask and notifies the dirty queue if the mask was dirty.
394 pub fn clear_mask(&self) {
395 let was_dirty = self.mask.clear();
396 if was_dirty {
397 if let Some(n) = self.notifier.get() {
398 n.notify_clean();
399 }
400 }
401 }
402}
403
404/// Write-only handle that forwards property-mutation notifications into a [`MutChannel`].
405#[derive(Clone)]
406pub struct MutSender {
407 channel: MutChannel,
408}
409
410impl MutSender {
411 /// Creates a `MutSender` backed by `channel`.
412 pub fn new(channel: &MutChannel) -> Self {
413 Self {
414 channel: channel.clone(),
415 }
416 }
417}
418
419impl PropertyMutate for MutSender {
420 fn mutate(&mut self, property_index: u8) -> bool {
421
422 self.channel.send(property_index)
423 }
424}
425
426/// Factory that produces per-user [`MutReceiver`]s from a shared [`MutChannel`].
427pub struct MutReceiverBuilder {
428 channel: MutChannel,
429}
430
431impl MutReceiverBuilder {
432 /// Creates a `MutReceiverBuilder` backed by `channel`.
433 pub fn new(channel: &MutChannel) -> Self {
434 Self {
435 channel: channel.clone(),
436 }
437 }
438
439 /// Builds a new [`MutReceiver`] for `address`, or `None` if the channel excludes this address.
440 pub fn build(&self, address: &Option<SocketAddr>) -> Option<MutReceiver> {
441 self.channel.new_receiver(address)
442 }
443}
444
445#[cfg(test)]
446mod dirty_queue_unlimited_kinds_tests {
447 //! Pins the post-T1.3 invariant: the per-user `DirtyQueue` is no
448 //! longer capped at 64 component kinds. The flat-strided
449 //! `Vec<AtomicU64>` storage scales with `kind_count`, and
450 //! `kind_bit` values ≥ 64 round-trip through `push` → `drain`.
451 use super::*;
452 use crate::EntityIndex;
453 use std::sync::Arc;
454
455 #[test]
456 fn stride_grows_with_kind_count() {
457 assert_eq!(DirtyQueue::new(1).stride(), 1);
458 assert_eq!(DirtyQueue::new(64).stride(), 1);
459 assert_eq!(DirtyQueue::new(65).stride(), 2);
460 assert_eq!(DirtyQueue::new(128).stride(), 2);
461 assert_eq!(DirtyQueue::new(129).stride(), 3);
462 assert_eq!(DirtyQueue::new(1024).stride(), 16);
463 }
464
465 #[test]
466 fn kind_bit_above_64_round_trips() {
467 let q = Arc::new(DirtyQueue::new(200));
468 q.ensure_capacity(0);
469 // Pre-T1.3 these kind_bits were unrepresentable (the assertion
470 // in ComponentKinds::add_component capped registration at 64).
471 for &kb in &[0u16, 63, 64, 65, 127, 128, 199] {
472 q.push(EntityIndex(0), kb);
473 }
474 let drained = q.drain();
475 assert_eq!(drained.len(), 1);
476 let (idx, words) = &drained[0];
477 assert_eq!(*idx, EntityIndex(0));
478 assert_eq!(words.len(), q.stride());
479 // Reconstruct the absolute bit positions.
480 let mut bits: Vec<u16> = Vec::new();
481 for (w, &word) in words.iter().enumerate() {
482 let mut remaining = word;
483 while remaining != 0 {
484 let b = remaining.trailing_zeros() as u16;
485 bits.push((w as u16) * 64 + b);
486 remaining &= remaining - 1;
487 }
488 }
489 bits.sort();
490 assert_eq!(bits, vec![0, 63, 64, 65, 127, 128, 199]);
491 }
492
493 #[test]
494 fn cancel_clears_high_kind_bit() {
495 let q = DirtyQueue::new(200);
496 q.ensure_capacity(0);
497 q.push(EntityIndex(0), 130);
498 q.cancel(EntityIndex(0), 130);
499 let drained = q.drain();
500 // Cancel zeroes the bit; drain skips entries with all-zero words.
501 assert!(drained.is_empty());
502 }
503
504 #[test]
505 fn multi_word_was_clear_fires_index_push_once() {
506 let q = DirtyQueue::new(200);
507 q.ensure_capacity(0);
508 // Two pushes to different words for the same entity. Race-tolerant
509 // was_clear may push the index twice, but drain dedupes via
510 // swap-zero — only one drained entry should appear.
511 q.push(EntityIndex(0), 5);
512 q.push(EntityIndex(0), 130);
513 let drained = q.drain();
514 assert_eq!(drained.len(), 1, "expected dedup via drain swap-zero");
515 let (_, words) = &drained[0];
516 assert_eq!(words[0], 1u64 << 5);
517 assert_eq!(words[2], 1u64 << (130 - 128));
518 }
519}